Running with Code Like with scissors, only more dangerous


The Microsoft Reactive Extensions

Posted by Rob Paveza

The honest truth is that I’m having difficulty establishing exactly what they could be used for, but they’re still really cool.  The Microsoft Reactive Extensions for the .NET Framework are the dual of LINQ: whereas LINQ operates over objects, or you might say pulls objects out of collections, the Reactive Extensions (Rx) handles push notifications.  It is the ultimate generalization of events and event handling within .NET.

Getting There

First, let’s consider the normal interfaces for IEnumerable:

interface IEnumerable<T>
    IEnumerator<T> GetEnumerator();

interface IEnumerator<T> : IDisposable
    T Current { get; }  // throws exception at end of enumeration
    bool MoveNext();

These interfaces (okay, really, the non-generic IEnumerable interface, but let’s not split hairs) are the foundation of the foreach C# keyword (and the For Each… In in Visual Basic).  A foreach can also be written, roughly, as:

foreach (string str in myListOfStrings)
// rewritten:
using (IEnumerator<string> enumStr = myListOfStrings.GetEnumerator())
    while (enumStr.MoveNext())

Keep this example in mind for later, because we’ll revisit how this can be used in Rx programming.


Dualism is something of a mathematical concept, and I don’t want to get into it because I don’t completely understand it myself, but most nerdy people reading my blog will probably appreciate an example from particle physics.  Consider a proton: its physical dual is the antiproton (because when they meet they annhilate each other.  It’s not an electron, because while they have opposite charge, they have substantially different mass).

The core of Rx is the dual of IEnumerable.  That is, IObservable<T> and IObserver<T>.  But let’s deconstruct these piece by piece.  Let’s start at IEnumerator<T>:

interface IObserver<T>
    // T Current { get; }
    // That method looks like: T get_Current();
    void OnNext(T next);
    // Current throws an exception if MoveNext() previously returned false, so:
    void OnError(Exception error);

    // bool MoveNext() 
    // returns true while Current is populated, false when we reach the end, so:
    void OnDone();

You can see that, whereas everything in IEnumerator<T> pulled data, now we’ve transitioned into pushing data.  But the observer isn’t really the cool part; rather, it’s the subject that’s cool:

interface IObservable<T>
    // GetEnumerator() returned an object; here we pass one in
    // We still needed to capture the disposable functionality, so we return IDisposable
    IDisposable Subscribe(IObserver<T> observer);

Now, if you want to see the specifics about how these were constructed, you can check out the Expert-to-Expert video on Channel 9.  I’ve included some high-level notes, but they’re not really as deep as you can get with these guys.

Creating a Subject

Creating a subject is a bit of a challenge; subjects are event-driven, and those are generally kind of difficult to think about because the fit usually only into one of two buckets: user interaction and system I/O.  For sake of example, I’ve created a simple Windows Forms project to start with, that has a couple observable Buttons (the class is called ObservableButton, go figure), and an observer, which is the containing form.  You can download the starter project, which requires Visual Studio 2010 and the Rx Framework.

Subjects can be anything, though, and the power you can glean from these is amazing.  For the Red Bull NASCAR team, I created a server for a Twitter feed aggregator using Rx.  It started as reading a socket into HTTP data, then into chunked HTTP data, then into JSON packets, then into POCO objects that were then re-serialized and sent over the wire to N Flash clients.  As you can imagine, network programming, social programming, or any other kind of programming where an event is coming in unpredictably is a great candidate for this.  Why?

Let’s look at the use case I just listed.  As Twitter’s live stream service sends data over the wire, I need to parse it and send it to a lot of listening sockets.  But I don’t want to just say “Oh I just got the data, let me send it out again” – that would possibly slow down processing on other threads, because I might have to wait – my socket might already be in the process of sending data and so it’s in an invalid state to send further data.  If I had tied a server socket directly to the “I’m ready to send” signal directly, I would have been in trouble.  Rather, I had a utility (an Observer) that aggregated incoming messages until all server sockets were ready to send, at which point it would push those updated messages to the server sockets.

Let’s look at the sample program:


This isn’t really anything spectacular.  I could have done that with regular event handlers.

Aggregating Subjects

The magic of Rx, from my perspective, lies with what you can do with subjects.  I’m no longer initializing my constructor to require two lines – I’m merging the two buttons into one observable sequence:

        public Form1()



The result is identical – the events get handled and all is good.

Modifying Sequences

Now I’m going to change the class definition slightly:

    public partial class Form1 : Form, IObserver<Timestamped<string>>
        public Form1()


        public void OnNext(Timestamped<string> value)
            this.textBox1.Text += value.Timestamp.ToString("hh:mm tt   ") + value.Value + Environment.NewLine;

        public void OnError(Exception error)
            this.textBox1.Text += "Exception caught: " + Environment.NewLine + error.ToString() + Environment.NewLine;

        public void OnCompleted()
            this.textBox1.Text += "Sequence completed." + Environment.NewLine;

Note that by adding in the .Timestamp() call, I’ve transformed the observable to sequence of strings to be an observable sequence of timestamped strings.  That’s pretty cool, right?

This is even cooler: the Delay() method:

                .Delay(new TimeSpan(0, 0, 1)).ObserveOn(this).Subscribe(this);

The ObserveOn method accepts a Windows Forms control, a Dispatcher (for WPF), or other scheduler implementation that can be used to synchronize the delay.  If I didn’t include it, the delayed merge would be called on a different thread, and we’d get an InvalidOperationException (because you can’t update a window on a thread other than the thread that created it). 

Do you want to avoid repetition?

                .DistinctUntilChanged(ts => ts.Value).Subscribe(this);

This produced output that only emitted one message, no matter how many times I clicked the same button, until I clicked the other button.

So, What Can We Do?

Well, right now it doesn’t seem like there’s a lot of tooling for Rx.  There’s a community wiki around the framework, though, and I think that we can eventually see a lot of good use.

Some ideas:

  • Develop a way to completely repeat ASP.NET requests.  Treat IIS as an IObservable<AspNetRequest>, where AspNetRequest contains all the state data that would otherwise populate these tools, which would immensely help with debugging.  Imagine when your tester only needs to record a series of test cases once, and otherwise is just testing for UI errors.
  • Wrap event-oriented APIs for simplified logging and replaying.  (In JinxBot, an event-oriented chat API named for my cat, I always wanted to capture all the events of the core API and be able to replay them via a subclass, which would have allowed pixel-perfect replay of a chat session).
  • Handle periodic data services like Twitter, SMS, email, or others in a clean and efficient way.

I’d like to see this take off, but it’s a very different way of looking at programming than what most .NET developers are used to.  Enjoy it, take a look, and let’s build it up!