Running with Code Like with scissors, only more dangerous


The Microsoft Reactive Extensions

Posted by Rob Paveza

The honest truth is that I’m having difficulty establishing exactly what they could be used for, but they’re still really cool.  The Microsoft Reactive Extensions for the .NET Framework are the dual of LINQ: whereas LINQ operates over objects, or you might say pulls objects out of collections, the Reactive Extensions (Rx) handles push notifications.  It is the ultimate generalization of events and event handling within .NET.

Getting There

First, let’s consider the normal interfaces for IEnumerable:

interface IEnumerable<T>
    IEnumerator<T> GetEnumerator();

interface IEnumerator<T> : IDisposable
    T Current { get; }  // throws exception at end of enumeration
    bool MoveNext();

These interfaces (okay, really, the non-generic IEnumerable interface, but let’s not split hairs) are the foundation of the foreach C# keyword (and the For Each… In in Visual Basic).  A foreach can also be written, roughly, as:

foreach (string str in myListOfStrings)
// rewritten:
using (IEnumerator<string> enumStr = myListOfStrings.GetEnumerator())
    while (enumStr.MoveNext())

Keep this example in mind for later, because we’ll revisit how this can be used in Rx programming.


Dualism is something of a mathematical concept, and I don’t want to get into it because I don’t completely understand it myself, but most nerdy people reading my blog will probably appreciate an example from particle physics.  Consider a proton: its physical dual is the antiproton (because when they meet they annhilate each other.  It’s not an electron, because while they have opposite charge, they have substantially different mass).

The core of Rx is the dual of IEnumerable.  That is, IObservable<T> and IObserver<T>.  But let’s deconstruct these piece by piece.  Let’s start at IEnumerator<T>:

interface IObserver<T>
    // T Current { get; }
    // That method looks like: T get_Current();
    void OnNext(T next);
    // Current throws an exception if MoveNext() previously returned false, so:
    void OnError(Exception error);

    // bool MoveNext() 
    // returns true while Current is populated, false when we reach the end, so:
    void OnDone();

You can see that, whereas everything in IEnumerator<T> pulled data, now we’ve transitioned into pushing data.  But the observer isn’t really the cool part; rather, it’s the subject that’s cool:

interface IObservable<T>
    // GetEnumerator() returned an object; here we pass one in
    // We still needed to capture the disposable functionality, so we return IDisposable
    IDisposable Subscribe(IObserver<T> observer);

Now, if you want to see the specifics about how these were constructed, you can check out the Expert-to-Expert video on Channel 9.  I’ve included some high-level notes, but they’re not really as deep as you can get with these guys.

Creating a Subject

Creating a subject is a bit of a challenge; subjects are event-driven, and those are generally kind of difficult to think about because the fit usually only into one of two buckets: user interaction and system I/O.  For sake of example, I’ve created a simple Windows Forms project to start with, that has a couple observable Buttons (the class is called ObservableButton, go figure), and an observer, which is the containing form.  You can download the starter project, which requires Visual Studio 2010 and the Rx Framework.

Subjects can be anything, though, and the power you can glean from these is amazing.  For the Red Bull NASCAR team, I created a server for a Twitter feed aggregator using Rx.  It started as reading a socket into HTTP data, then into chunked HTTP data, then into JSON packets, then into POCO objects that were then re-serialized and sent over the wire to N Flash clients.  As you can imagine, network programming, social programming, or any other kind of programming where an event is coming in unpredictably is a great candidate for this.  Why?

Let’s look at the use case I just listed.  As Twitter’s live stream service sends data over the wire, I need to parse it and send it to a lot of listening sockets.  But I don’t want to just say “Oh I just got the data, let me send it out again” – that would possibly slow down processing on other threads, because I might have to wait – my socket might already be in the process of sending data and so it’s in an invalid state to send further data.  If I had tied a server socket directly to the “I’m ready to send” signal directly, I would have been in trouble.  Rather, I had a utility (an Observer) that aggregated incoming messages until all server sockets were ready to send, at which point it would push those updated messages to the server sockets.

Let’s look at the sample program:


This isn’t really anything spectacular.  I could have done that with regular event handlers.

Aggregating Subjects

The magic of Rx, from my perspective, lies with what you can do with subjects.  I’m no longer initializing my constructor to require two lines – I’m merging the two buttons into one observable sequence:

        public Form1()



The result is identical – the events get handled and all is good.

Modifying Sequences

Now I’m going to change the class definition slightly:

    public partial class Form1 : Form, IObserver<Timestamped<string>>
        public Form1()


        public void OnNext(Timestamped<string> value)
            this.textBox1.Text += value.Timestamp.ToString("hh:mm tt   ") + value.Value + Environment.NewLine;

        public void OnError(Exception error)
            this.textBox1.Text += "Exception caught: " + Environment.NewLine + error.ToString() + Environment.NewLine;

        public void OnCompleted()
            this.textBox1.Text += "Sequence completed." + Environment.NewLine;

Note that by adding in the .Timestamp() call, I’ve transformed the observable to sequence of strings to be an observable sequence of timestamped strings.  That’s pretty cool, right?

This is even cooler: the Delay() method:

                .Delay(new TimeSpan(0, 0, 1)).ObserveOn(this).Subscribe(this);

The ObserveOn method accepts a Windows Forms control, a Dispatcher (for WPF), or other scheduler implementation that can be used to synchronize the delay.  If I didn’t include it, the delayed merge would be called on a different thread, and we’d get an InvalidOperationException (because you can’t update a window on a thread other than the thread that created it). 

Do you want to avoid repetition?

                .DistinctUntilChanged(ts => ts.Value).Subscribe(this);

This produced output that only emitted one message, no matter how many times I clicked the same button, until I clicked the other button.

So, What Can We Do?

Well, right now it doesn’t seem like there’s a lot of tooling for Rx.  There’s a community wiki around the framework, though, and I think that we can eventually see a lot of good use.

Some ideas:

  • Develop a way to completely repeat ASP.NET requests.  Treat IIS as an IObservable<AspNetRequest>, where AspNetRequest contains all the state data that would otherwise populate these tools, which would immensely help with debugging.  Imagine when your tester only needs to record a series of test cases once, and otherwise is just testing for UI errors.
  • Wrap event-oriented APIs for simplified logging and replaying.  (In JinxBot, an event-oriented chat API named for my cat, I always wanted to capture all the events of the core API and be able to replay them via a subclass, which would have allowed pixel-perfect replay of a chat session).
  • Handle periodic data services like Twitter, SMS, email, or others in a clean and efficient way.

I’d like to see this take off, but it’s a very different way of looking at programming than what most .NET developers are used to.  Enjoy it, take a look, and let’s build it up!


Speedy C#, Part 2: Optimizing Memory Allocations – Pooling and Reusing Objects

Posted by Rob

In C#, Visual Basic .NET, C++/CLI, J# - the list goes on - we're freed from having to worry about our memory management.  Objects take care of themselves, and when the CLR garbage collector detects that an object is no longer in use, it frees the associated memory.  That doesn't mean that we should run around allocating and deallocating objects all willy-nilly; in fact, since we have less control over memory, we arguably have the opportunity to be more careful with the way we use high-frequency objects.

Memory Regions in .NET

In .NET, and generally in most programming, we can think of two places in which we can access memory: the stack and the heap.  We can think of Stack memory as temporary workspace, or scratch space; when we leave a function, all of our stack goes away.  Way, way down in the machine architecture, the stack also stores the return addresses of functions.  The stack also stores function parameters.  It's generally very orderly, inexpensive, but its volatile nature makes it a poor candidate for long-term storage.  In .NET, all types that derive from the ValueType class are stored on the stack unless they are boxed into an object reference; this includes types defined with the struct and enum keywords, as well as all of the primitive types except string (including int, double, and bool).

Heap memory is another matter.  The heap is a region of memory reserved for the use of the program and is intended to store objects that aren't quite so transient.  This might be something like a database connection, a file or buffer, or a window.

The Enemy: Fragmentation

Over time, objects are allocated and eventually released, and because there's not really any rhyme or reason, the heap becomes chaotic.  Allocations grab the first free block that's large enough (sometimes larger than necessary) and hold onto it until they go into the abyss.  This leads to fragmentation - all the free memory must be tracked somehow, and here's the real killer: contiguous blocks of free memory may not always be recognized as such.  Check this out: let's say we have a heap allocated at memory location 0x4000 that is 32 bytes wide:

An un-used heap

(Yes, my awesome artwork was done with none other than Excel!)

Suppose we allocate an 8-byte object and another 8-byte object, then a 16-byte object.  The first is in red, the second in orange, and the third in gray:

The heap after it was filled

Now I'll free the first and the third objects; we'll have 24 bytes of total free memory:

A fragmented heap

Either we need to keep track of every little piece of memory, which might be the fastest algorithm for releasing but slow for allocating (not to mention potentially VERY wasteful), or try to come up with another solution.  This type of memory fragmentation is referred to as external fragmentation.

The Garbage Collector and Compaction

The garbage collector has two components: a reference counter and a compaction engine.  The reference counter is responsible for determining when objects no longer have references to them; this frees programmers from having to explicitly destroy objects (as is the practice in C++ with the delete operator, or in C with the free function).  A lazy thread is then able to release and compact memory as needed, avoiding much of the overhead of external fragmentation and also allowing unused memory to be reclaimed.  The garbage collector in .NET is generational; it checks the newest objects first (what are called "gen-0"), and if the newest objects are still in use, they get moved to gen-1.  If the memory pressure requires, gen-1 objects are evaluated, and if they are still in use, they get moved to gen-2.  Gen-2 objects are considered long-lasting, and are only checked when memory pressure is severe.

Let's go back to our heap example; supposing I had an 8-byte, another 8-byte, and a 12-byte allocation, here's my heap graph:

A new heap

Object 1 (in red) has gone out of scope, but objects 2 and three are sticking around.  Using normal memory freeing rules, the largest object that could be allocated would still only be 8 bytes, because that would be the largest contiguous free space.  However, using .NET's compacting garbage collector, we could expect something along these lines:

A compacted heap graph

Here we can see we've dealt with the problem of external fragmentation by compacting the heap.  This convenience doesn't come without a cost, though; while the garbage collector performed the compaction, all of your application threads were suspended.  The GC can't guarantee object integrity if memory is getting abused during a garbage collection!

Preventing Compaction: Stop Killing off Objects!

Object pooling is a pattern to use that allows objects to be reused rather than allocated and deallocated, which helps to prevent heap fragmentation as well as costly GC compactions.  A pool can be thought of as an object factory; in essence, the most rudimentary pool could look like this:

   1: public class Pool<T> where T : new()
   2: {
   3:     private Stack<T> _items = new Stack<T>();
   4:     private object _sync = new object(); 
   6:     public T Get()
   7:     {
   8:         lock (_sync)
   9:         {
  10:             if (_items.Count == 0)
  11:             {
  12:                 return new T();
  13:             }
  14:             else
  15:             {
  16:                 return _items.Pop();
  17:             }
  18:         }
  19:     }
  21:     public void Free(T item)
  22:     {
  23:         lock (_sync)
  24:         {
  25:             _items.Push(item);
  26:         }
  27:     }
  28: }

Here, objects are created entirely on-demand and, when freed, are stored in a stack.  The reason we want to use a Stack is the performance characteristic of adding and removing objects; operations are always performed at the end of the list, which makes it highly efficient to add or remove items.  If possible, it may be prudent to pre-create a number of objects for use throughout the lifetime of your application. 

Here's an example: the project I've been discussing lately uses a pool of byte arrays to handle incoming network messages received and sent via a Socket.  When pooling is enabled, over the course of the application's lifetime, there were 17 Gen-0 collections, 5 Gen-1 collections, and 3 Gen-2 collections; a total of 270 byte[] instances were allocated, of which 44 were eligible for pooling and were pooled.  When pooling is disabled, there were 22 Gen-0 collections, 5 Gen-1 collections, and 3 Gen-2 collections; a total of 11,660 byte[] instances were allocated, of which approximately 10,900 were eligible for pooling.  That's a lot of memory!

Summary - When and Why

Object pooling is a powerful optimization technique, and if you're already using factory patterns it shouldn't be terribly foreign to you.  The .NET Framework includes the ThreadPool class as part of System.Threading.  Other objects you might consider pooling are database connections, any expensive links to unmanaged code, or anything that needs to be allocated frequently and can then be thrown away.  In my example, byte arrays are exceptionally good for this because they can be overwritten easily.

Further Reading

The "Speedy C#" Series: