Running with Code Like with scissors, only more dangerous


The Microsoft Reactive Extensions

Posted by Rob Paveza

The honest truth is that I’m having difficulty establishing exactly what they could be used for, but they’re still really cool.  The Microsoft Reactive Extensions for the .NET Framework are the dual of LINQ: whereas LINQ operates over objects, or you might say pulls objects out of collections, the Reactive Extensions (Rx) handles push notifications.  It is the ultimate generalization of events and event handling within .NET.

Getting There

First, let’s consider the normal interfaces for IEnumerable:

interface IEnumerable<T>
    IEnumerator<T> GetEnumerator();

interface IEnumerator<T> : IDisposable
    T Current { get; }  // throws exception at end of enumeration
    bool MoveNext();

These interfaces (okay, really, the non-generic IEnumerable interface, but let’s not split hairs) are the foundation of the foreach C# keyword (and the For Each… In in Visual Basic).  A foreach can also be written, roughly, as:

foreach (string str in myListOfStrings)
// rewritten:
using (IEnumerator<string> enumStr = myListOfStrings.GetEnumerator())
    while (enumStr.MoveNext())

Keep this example in mind for later, because we’ll revisit how this can be used in Rx programming.


Dualism is something of a mathematical concept, and I don’t want to get into it because I don’t completely understand it myself, but most nerdy people reading my blog will probably appreciate an example from particle physics.  Consider a proton: its physical dual is the antiproton (because when they meet they annhilate each other.  It’s not an electron, because while they have opposite charge, they have substantially different mass).

The core of Rx is the dual of IEnumerable.  That is, IObservable<T> and IObserver<T>.  But let’s deconstruct these piece by piece.  Let’s start at IEnumerator<T>:

interface IObserver<T>
    // T Current { get; }
    // That method looks like: T get_Current();
    void OnNext(T next);
    // Current throws an exception if MoveNext() previously returned false, so:
    void OnError(Exception error);

    // bool MoveNext() 
    // returns true while Current is populated, false when we reach the end, so:
    void OnDone();

You can see that, whereas everything in IEnumerator<T> pulled data, now we’ve transitioned into pushing data.  But the observer isn’t really the cool part; rather, it’s the subject that’s cool:

interface IObservable<T>
    // GetEnumerator() returned an object; here we pass one in
    // We still needed to capture the disposable functionality, so we return IDisposable
    IDisposable Subscribe(IObserver<T> observer);

Now, if you want to see the specifics about how these were constructed, you can check out the Expert-to-Expert video on Channel 9.  I’ve included some high-level notes, but they’re not really as deep as you can get with these guys.

Creating a Subject

Creating a subject is a bit of a challenge; subjects are event-driven, and those are generally kind of difficult to think about because the fit usually only into one of two buckets: user interaction and system I/O.  For sake of example, I’ve created a simple Windows Forms project to start with, that has a couple observable Buttons (the class is called ObservableButton, go figure), and an observer, which is the containing form.  You can download the starter project, which requires Visual Studio 2010 and the Rx Framework.

Subjects can be anything, though, and the power you can glean from these is amazing.  For the Red Bull NASCAR team, I created a server for a Twitter feed aggregator using Rx.  It started as reading a socket into HTTP data, then into chunked HTTP data, then into JSON packets, then into POCO objects that were then re-serialized and sent over the wire to N Flash clients.  As you can imagine, network programming, social programming, or any other kind of programming where an event is coming in unpredictably is a great candidate for this.  Why?

Let’s look at the use case I just listed.  As Twitter’s live stream service sends data over the wire, I need to parse it and send it to a lot of listening sockets.  But I don’t want to just say “Oh I just got the data, let me send it out again” – that would possibly slow down processing on other threads, because I might have to wait – my socket might already be in the process of sending data and so it’s in an invalid state to send further data.  If I had tied a server socket directly to the “I’m ready to send” signal directly, I would have been in trouble.  Rather, I had a utility (an Observer) that aggregated incoming messages until all server sockets were ready to send, at which point it would push those updated messages to the server sockets.

Let’s look at the sample program:


This isn’t really anything spectacular.  I could have done that with regular event handlers.

Aggregating Subjects

The magic of Rx, from my perspective, lies with what you can do with subjects.  I’m no longer initializing my constructor to require two lines – I’m merging the two buttons into one observable sequence:

        public Form1()



The result is identical – the events get handled and all is good.

Modifying Sequences

Now I’m going to change the class definition slightly:

    public partial class Form1 : Form, IObserver<Timestamped<string>>
        public Form1()


        public void OnNext(Timestamped<string> value)
            this.textBox1.Text += value.Timestamp.ToString("hh:mm tt   ") + value.Value + Environment.NewLine;

        public void OnError(Exception error)
            this.textBox1.Text += "Exception caught: " + Environment.NewLine + error.ToString() + Environment.NewLine;

        public void OnCompleted()
            this.textBox1.Text += "Sequence completed." + Environment.NewLine;

Note that by adding in the .Timestamp() call, I’ve transformed the observable to sequence of strings to be an observable sequence of timestamped strings.  That’s pretty cool, right?

This is even cooler: the Delay() method:

                .Delay(new TimeSpan(0, 0, 1)).ObserveOn(this).Subscribe(this);

The ObserveOn method accepts a Windows Forms control, a Dispatcher (for WPF), or other scheduler implementation that can be used to synchronize the delay.  If I didn’t include it, the delayed merge would be called on a different thread, and we’d get an InvalidOperationException (because you can’t update a window on a thread other than the thread that created it). 

Do you want to avoid repetition?

                .DistinctUntilChanged(ts => ts.Value).Subscribe(this);

This produced output that only emitted one message, no matter how many times I clicked the same button, until I clicked the other button.

So, What Can We Do?

Well, right now it doesn’t seem like there’s a lot of tooling for Rx.  There’s a community wiki around the framework, though, and I think that we can eventually see a lot of good use.

Some ideas:

  • Develop a way to completely repeat ASP.NET requests.  Treat IIS as an IObservable<AspNetRequest>, where AspNetRequest contains all the state data that would otherwise populate these tools, which would immensely help with debugging.  Imagine when your tester only needs to record a series of test cases once, and otherwise is just testing for UI errors.
  • Wrap event-oriented APIs for simplified logging and replaying.  (In JinxBot, an event-oriented chat API named for my cat, I always wanted to capture all the events of the core API and be able to replay them via a subclass, which would have allowed pixel-perfect replay of a chat session).
  • Handle periodic data services like Twitter, SMS, email, or others in a clean and efficient way.

I’d like to see this take off, but it’s a very different way of looking at programming than what most .NET developers are used to.  Enjoy it, take a look, and let’s build it up!


The Difficulties of Using a Multicast Event-based API

Posted by Rob

I've made allusions to my current project several times, and while I can't discuss it with specifics, but I'm working with hardware; part of the device is an armature that extends to receive a piece of equipment from the user, and then it will once again extend to re-dispense the object at the conclusion of its task.  The armature controller hardware is strictly asynchronous, and as such, I decided initially to write it with asynchronous callbacks.  It's fairly straightfoward to have a method called "Extend" and an event called "Extended."  Once I was able to establish this API, I thought it would be fairly straightforward for the rest of the development team to move forward.

I thought incorrectly.

As .NET developers, I think we're conditioned from the time that we open the IDE to make sure that event handlers are wired up throughout the lifetime of the application.  In fact, both C# and Visual Basic make it so easy for multiple methods to handle events, that it's sometimes silly not to.  C# and Visual Basic event handling syntax is syntactical sugar of the Observer pattern.  And why shouldn't it be?  For probably 99.9% of the applications out there, it's great.

Still, the current project has me a bit miffed.

Consider that there are two instances in which the arm needs to be extended: one to accept the user's item, and the other to re-dispense it.  Dispensing the item happens automatically when the arm is flipped upside-down and then extended.  So, I can track state:

private void Arm_Extended(object sender, EventArgs e)
    if (receivingItem)
        // wait for button press
    else // if dispensing item

Doesn't this seem kludgy to you?  If not here, then consider that within a single class I potentially have six objects reporting asynchronous task results that may need to be handled differently depending on the state of the machine.

Unfortunately, the alternative method seems just as kludgy:

private void ArmExtendedForUserPrompt(object sender, EventArgs e)
    Arm.Extended -= handlerForArmExtendingForPrompt;

private void ArmExtendedForDispensing(object sender, EventArgs e)
    Arm.Retracted += handlerForArmRetractAfterDispenseCompleted;
    Arm.Extended -= handlerForArmExtendedForDispensing;

The primary reason I've chosen the latter approach is that stack traces have a bit more meaning to me.  That's it.  I guess it's arguable that I don't need to manage umpteen state flags.  ( As a side note, I've just learned that the spell checker in Windows Live Writer considers "umpteen" to not be a spelling error ).  But the truth is that, state flags or function pointers, I'm still managing state.

It might have been more appropriate to handle asynchronous callbacks as parameters to functions:

public void Extend(object state, AsyncCallback callback) { ...

This is tricky, too; it means needing to manage an additional two variables per asynchronous call.  For an object that might support multiple concurrent asynchronous operations, that can become a nightmare of complexity management.

I don't have the right answer for this.  But it's definitely something to watch out for in the future.

Tagged as: , , No Comments

Why doesn’t Dispatcher implement ISynchronizeInvoke?

Posted by Rob

This is a rant.  I don't have the answer to the question.

So, the latest project I'm working on is a kiosk app in WPF that has to interact with hardware.  The hardware has various needs; some of it I need to poll, and others I check on status every given interval.  Every class I've created for interacting with the hardware has a SynchronizingObject property, just like the System.Timers.Timer class, and I was pretty happy with myself when I figured out that my event raising implementation was the same as that class's.

The SynchronizingObject property looks like this:

   1:  public ISynchronizeInvoke SynchronizingObject
   2:  {
   3:      get { return m_syncObj; }
   4:      set { m_syncObj = value; } 
   5:  }

Pretty straightforward.  To call an event it might be something like this:

   1:  protected virtual void OnStatusChanged(EventArgs e)
   2:  {
   3:      if (StatusChanged != null)
   4:      {
   5:          if (m_syncObj == null || !m_syncObj.InvokeRequired)
   6:              StatusChanged(this, e);
   7:          else
   8:              m_syncObj.BeginInvoke(StatusChanged, new object[] { this, e });
   9:      }
  10:  }

Easy?  Good.

Well, like apparently everything straightforward about Windows Forms programming, it's been changed for Windows Presentation Foundation.  Each Visual element has an associated Dispatcher; Dispatchers are created per-thread, and like in Windows Forms, you can't update a Visual or its descendent tree from another thread.  And the Dispatcher class almost looks like it would be interface-compatible with ISynchronizeInvoke with a couple exceptions.  I thought, "great!  I'll be able to just create a simple Adapter class!"  Nope.  Well, it wasn't simple, that's for sure.

Let's take a look at the overload list for Invoke and you might see why.  Invoked delegates either take one argument or one argument plus a params list of arguments.  Then you think.... what?

There are some issues with params lists.  For example, let's say that you're passing an argument that is an object[].  Should that get expanded?  Consider this code:

   1:  void DoStuff(params object[] args) { ... }
   3:  // now within a function
   4:  object[] values = new object[] { 1, "stuff", 25.0 };
   5:  DoStuff(values, "something else?");

The kind of difficulty we run into is -- should "values" be expanded out or should it stay as an array?  In other words, should args be { 1, "stuff", 25.0, "something else?" }, or should it be { { 1, "stuff", 25.0 }, "something else?" }?  What about when it's the only argument passed - what if DoStuff(values) is the call?  Then should args be {1, "stuff", 25.0} or { {1, "stuff", 25.0} }?  For the purposes of this application, I assumed that, in the first case, args would be like the latter; and in the second case, args would be like the former.

So here's my test app -- it's a basic WPF application.  Here's Window1.xaml:

   1:  <Window x:Class="DispatcherTest.Window1"
   2:      xmlns=""
   3:      xmlns:x=""
   4:      Title="Window1" Height="300" Width="300">
   5:      <Grid>
   6:          <Button Height="23" Margin="102,87,100,0" Name="button1" VerticalAlignment="Top" Click="button1_Click">Button</Button>
   7:      </Grid>
   8:  </Window>

and Window1.xaml.cs:

   1:  public partial class Window1 : Window
   2:  {
   3:      private ISynchronizeInvoke m_invoker;
   5:      public Window1()
   6:      {
   7:          InitializeComponent();
   8:          m_invoker = new DispatcherWinFormsCompatAdapter(this.Dispatcher));
   9:      }
  11:      private void button1_Click(object sender, RoutedEventArgs e)
  12:      {
  13:          ThreadPool.QueueUserWorkItem(new WaitCallback(DoSomeLongRunningWork), null);
  14:      }
  16:      private void DoSomeLongRunningWork(object state)
  17:      {
  18:          Thread.Sleep(2000);
  19:          EventHandler updateButton = delegate(object sender, EventArgs e)
  20:          {
  21:              button1.Content = "Clicked!";
  22:          };
  24:          if (m_invoker.InvokeRequired)
  25:          {
  26:              m_invoker.BeginInvoke(updateButton, new object[] { this, EventArgs.Empty });
  27:          }
  28:          else
  29:          {
  30:              updateButton(this, EventArgs.Empty);
  31:          }
  32:      }
  33:  }

Finally, here's my skeleton adapter class.  Note that EndInvoke and Invoke are not used so I don't implement them yet:

   1:  internal class DispatcherWinFormsCompatAdapter : ISynchronizeInvoke
   2:  {
   3:      #region IAsyncResult implementation
   4:      private class DispatcherAsyncResultAdapter : IAsyncResult
   5:      {
   6:          private DispatcherOperation m_op;
   7:          private object m_state;
   9:          public DispatcherAsyncResultAdapter(DispatcherOperation operation)
  10:          {
  11:              m_op = operation;
  12:          }
  14:          public DispatcherAsyncResultAdapter(DispatcherOperation operation, object state)
  15:              : this(operation)
  16:          {
  17:              m_state = state;
  18:          }
  20:          public DispatcherOperation Operation
  21:          {
  22:              get { return m_op; }
  23:          }
  25:          #region IAsyncResult Members
  27:          public object AsyncState
  28:          {
  29:              get { return m_state; }
  30:          }
  32:          public WaitHandle AsyncWaitHandle
  33:          {
  34:              get { return null; }
  35:          }
  37:          public bool CompletedSynchronously
  38:          {
  39:              get { return false; }
  40:          }
  42:          public bool IsCompleted
  43:          {
  44:              get { return m_op.Status == DispatcherOperationStatus.Completed; }
  45:          }
  47:          #endregion
  48:      }
  49:      #endregion
  50:      private Dispatcher m_disp;
  51:      public DispatcherWinFormsCompatAdapter(Dispatcher dispatcher)
  52:      {
  53:          m_disp = dispatcher;
  54:      }
  55:      #region ISynchronizeInvoke Members
  57:      public IAsyncResult BeginInvoke(Delegate method, object[] args)
  58:      {
  59:          if (args != null && args.Length > 1)
  60:          {
  61:              object[] argsSansFirst = GetArgsAfterFirst(args);
  62:              DispatcherOperation op = m_disp.BeginInvoke(DispatcherPriority.Normal, method, args[0], argsSansFirst);
  63:              return new DispatcherAsyncResultAdapter(op);
  64:          }
  65:          else
  66:          {
  67:              if (args != null)
  68:              {
  69:                  return new DispatcherAsyncResultAdapter(m_disp.BeginInvoke(DispatcherPriority.Normal, method, args[0]));
  70:              }
  71:              else
  72:              {
  73:                  return new DispatcherAsyncResultAdapter(m_disp.BeginInvoke(DispatcherPriority.Normal, method));
  74:              }
  75:          }
  76:      }
  78:      private static object[] GetArgsAfterFirst(object[] args)
  79:      {
  80:          object[] result = new object[args.Length - 1];
  81:          Array.Copy(args, 1, result, 0, args.Length - 1);
  82:          return result;
  83:      }
  85:      public object EndInvoke(IAsyncResult result)
  86:      {
  87:          DispatcherAsyncResultAdapter res = result as DispatcherAsyncResultAdapter;
  88:          if (res == null)
  89:              throw new InvalidCastException();
  91:          while (res.Operation.Status != DispatcherOperationStatus.Completed || res.Operation.Status == DispatcherOperationStatus.Aborted)
  92:          {
  93:              Thread.Sleep(50);
  94:          }
  96:          return res.Operation.Result;
  97:      }
  99:      public object Invoke(Delegate method, object[] args)
 100:      {
 101:          if (args != null && args.Length > 1)
 102:          {
 103:              object[] argsSansFirst = GetArgsAfterFirst(args);
 104:              return m_disp.Invoke(DispatcherPriority.Normal, method, args[0], argsSansFirst);
 105:          }
 106:          else
 107:          {
 108:              if (args != null)
 109:              {
 110:                  return m_disp.Invoke(DispatcherPriority.Normal, method, args[0]);
 111:              }
 112:              else
 113:              {
 114:                  return m_disp.Invoke(DispatcherPriority.Normal, method);
 115:              }
 116:          }
 117:      }
 119:      public bool InvokeRequired
 120:      {
 121:          get { return m_disp.Thread != Thread.CurrentThread; }
 122:      }
 124:      #endregion
 125:  }

All told, I'm still not sure that this will work with delegates with more than two parameters.  I'm still concerned about params argument binding -- but what can I do?  Reflection.Emit custom function calls for n parameters?  No thanks.

Microsoft guy who made System.Windows.Threading - for .NET 4.0, how about making Dispatcher implement ISynchronizeInvoke, hm?  Thanks!

Tagged as: , No Comments

My C# 4.0 Wishlist, Part 5 : The raise Keyword

Posted by Rob

One of the more obscure features of C# is the ability to specify custom overloads for adding and removing event registration similarly to properties, via the add and remove keywords.  Known as "event accessors," they implement the parts of event registration that the C# compiler normally handles.  You didn't think that that += operator was implemented on the type, did you?

   1:  class Test
   2:  {
   3:      public event EventHandler Event1;
   5:      private EventHandler ev2;
   6:      public event EventHandler Event2
   7:      {
   8:          add
   9:          {
  10:              if (ev2 != null)
  11:                  ev2 = (EventHandler)Delegate.Combine(ev2, value);
  12:              else
  13:                  ev2 = value;
  14:          }
  15:          remove
  16:          {
  17:              if (ev2 != null)
  18:                  ev2 = (EventHandler)Delegate.Remove(ev2, value);
  19:          }
  20:      }
  21:      protected virtual void OnEvent2(EventArgs e)
  22:      {
  23:          if (ev2 != null)
  24:              ev2(this, e);
  25:      }
  26:  }

This pattern is actually used extensively throughout the Windows Forms library, where controls add event handlers to base event handler collections implemented within a hashtable.  I can only surmise that this is done to prevent having dozens of event fields cluttering up the classes.

Now, if we were to compile this app and disassemble it in Reflector, we'd get a very similar picture to what we've got.  Reflector would show the compiler-generated add/remove blocks for Event1, though not when the event declaration is selected, and it also indicates that there are compiler directives that show the event accessors are synchronized.

Visual Basic .NET also supports this pattern, but adds an additional keyword: the RaiseEvent keyword:

   1:  Public Class Test
   2:      Public Event Event1 As EventHandler
   4:      Private ev2 As EventHandler
   5:      Public Custom Event Event2 As EventHandler
   6:          AddHandler(ByVal value As EventHandler)
   7:              If Not ev2 Is Nothing Then
   8:                  ev2 = CType(System.Delegate.Combine(ev2, value), EventHandler)
   9:              Else
  10:                  ev2 = value
  11:              End If
  12:          End AddHandler
  14:          RemoveHandler(ByVal value As EventHandler)
  15:              If Not ev2 Is Nothing Then
  16:                  ev2 = CType(System.Delegate.Remove(ev2, value), EventHandler)
  17:              End If
  18:          End RemoveHandler
  20:          RaiseEvent(ByVal sender As Object, ByVal e As System.EventArgs)
  21:              ev2(sender, e)
  22:          End RaiseEvent
  23:      End Event
  25:      Protected Overridable Sub OnEvent2(ByVal e As EventArgs)
  26:          If Not ev2 Is Nothing Then
  27:              RaiseEvent Event2(Me, e)
  28:          End If
  29:      End Sub
  30:  End Class

In this example, Visual Basic allows you to implement exactly how Event2 is raised.  When I look at this in Reflector to see how C# uses this, here's what I see:

Reflector view of custom VB event

Reflector gives C# the raise keyword.  Why haven't the C# language experts done so?

How would this be worthwhile?  Well, suppose that we're building an application that can have plugins.  We don't know that plugins are always going to work correctly, so when they handle an event, they may raise an exception.  The problem is, if an event is invoked and the first event handler causes an exception, none of the successive handlers will be invoked.

Arguably, the "state of the application is undefined after an exception is raised, so we should gracefully exit."  But that's not always the case!  What if the way to gracefully do this is to analyze the stack trace within the application, determine which plugin caused the exception, and unload the plugin?  We can't do any of this from C#.

Give us the raise keyword!

This is the end of my "C# 4.0 Wishlist" series.  For reference, here are the other articles: