Running with Code Like with scissors, only more dangerous


The Microsoft Reactive Extensions

Posted by Rob Paveza

The honest truth is that I’m having difficulty establishing exactly what they could be used for, but they’re still really cool.  The Microsoft Reactive Extensions for the .NET Framework are the dual of LINQ: whereas LINQ operates over objects, or you might say pulls objects out of collections, the Reactive Extensions (Rx) handles push notifications.  It is the ultimate generalization of events and event handling within .NET.

Getting There

First, let’s consider the normal interfaces for IEnumerable:

interface IEnumerable<T>
    IEnumerator<T> GetEnumerator();

interface IEnumerator<T> : IDisposable
    T Current { get; }  // throws exception at end of enumeration
    bool MoveNext();

These interfaces (okay, really, the non-generic IEnumerable interface, but let’s not split hairs) are the foundation of the foreach C# keyword (and the For Each… In in Visual Basic).  A foreach can also be written, roughly, as:

foreach (string str in myListOfStrings)
// rewritten:
using (IEnumerator<string> enumStr = myListOfStrings.GetEnumerator())
    while (enumStr.MoveNext())

Keep this example in mind for later, because we’ll revisit how this can be used in Rx programming.


Dualism is something of a mathematical concept, and I don’t want to get into it because I don’t completely understand it myself, but most nerdy people reading my blog will probably appreciate an example from particle physics.  Consider a proton: its physical dual is the antiproton (because when they meet they annhilate each other.  It’s not an electron, because while they have opposite charge, they have substantially different mass).

The core of Rx is the dual of IEnumerable.  That is, IObservable<T> and IObserver<T>.  But let’s deconstruct these piece by piece.  Let’s start at IEnumerator<T>:

interface IObserver<T>
    // T Current { get; }
    // That method looks like: T get_Current();
    void OnNext(T next);
    // Current throws an exception if MoveNext() previously returned false, so:
    void OnError(Exception error);

    // bool MoveNext() 
    // returns true while Current is populated, false when we reach the end, so:
    void OnDone();

You can see that, whereas everything in IEnumerator<T> pulled data, now we’ve transitioned into pushing data.  But the observer isn’t really the cool part; rather, it’s the subject that’s cool:

interface IObservable<T>
    // GetEnumerator() returned an object; here we pass one in
    // We still needed to capture the disposable functionality, so we return IDisposable
    IDisposable Subscribe(IObserver<T> observer);

Now, if you want to see the specifics about how these were constructed, you can check out the Expert-to-Expert video on Channel 9.  I’ve included some high-level notes, but they’re not really as deep as you can get with these guys.

Creating a Subject

Creating a subject is a bit of a challenge; subjects are event-driven, and those are generally kind of difficult to think about because the fit usually only into one of two buckets: user interaction and system I/O.  For sake of example, I’ve created a simple Windows Forms project to start with, that has a couple observable Buttons (the class is called ObservableButton, go figure), and an observer, which is the containing form.  You can download the starter project, which requires Visual Studio 2010 and the Rx Framework.

Subjects can be anything, though, and the power you can glean from these is amazing.  For the Red Bull NASCAR team, I created a server for a Twitter feed aggregator using Rx.  It started as reading a socket into HTTP data, then into chunked HTTP data, then into JSON packets, then into POCO objects that were then re-serialized and sent over the wire to N Flash clients.  As you can imagine, network programming, social programming, or any other kind of programming where an event is coming in unpredictably is a great candidate for this.  Why?

Let’s look at the use case I just listed.  As Twitter’s live stream service sends data over the wire, I need to parse it and send it to a lot of listening sockets.  But I don’t want to just say “Oh I just got the data, let me send it out again” – that would possibly slow down processing on other threads, because I might have to wait – my socket might already be in the process of sending data and so it’s in an invalid state to send further data.  If I had tied a server socket directly to the “I’m ready to send” signal directly, I would have been in trouble.  Rather, I had a utility (an Observer) that aggregated incoming messages until all server sockets were ready to send, at which point it would push those updated messages to the server sockets.

Let’s look at the sample program:


This isn’t really anything spectacular.  I could have done that with regular event handlers.

Aggregating Subjects

The magic of Rx, from my perspective, lies with what you can do with subjects.  I’m no longer initializing my constructor to require two lines – I’m merging the two buttons into one observable sequence:

        public Form1()



The result is identical – the events get handled and all is good.

Modifying Sequences

Now I’m going to change the class definition slightly:

    public partial class Form1 : Form, IObserver<Timestamped<string>>
        public Form1()


        public void OnNext(Timestamped<string> value)
            this.textBox1.Text += value.Timestamp.ToString("hh:mm tt   ") + value.Value + Environment.NewLine;

        public void OnError(Exception error)
            this.textBox1.Text += "Exception caught: " + Environment.NewLine + error.ToString() + Environment.NewLine;

        public void OnCompleted()
            this.textBox1.Text += "Sequence completed." + Environment.NewLine;

Note that by adding in the .Timestamp() call, I’ve transformed the observable to sequence of strings to be an observable sequence of timestamped strings.  That’s pretty cool, right?

This is even cooler: the Delay() method:

                .Delay(new TimeSpan(0, 0, 1)).ObserveOn(this).Subscribe(this);

The ObserveOn method accepts a Windows Forms control, a Dispatcher (for WPF), or other scheduler implementation that can be used to synchronize the delay.  If I didn’t include it, the delayed merge would be called on a different thread, and we’d get an InvalidOperationException (because you can’t update a window on a thread other than the thread that created it). 

Do you want to avoid repetition?

                .DistinctUntilChanged(ts => ts.Value).Subscribe(this);

This produced output that only emitted one message, no matter how many times I clicked the same button, until I clicked the other button.

So, What Can We Do?

Well, right now it doesn’t seem like there’s a lot of tooling for Rx.  There’s a community wiki around the framework, though, and I think that we can eventually see a lot of good use.

Some ideas:

  • Develop a way to completely repeat ASP.NET requests.  Treat IIS as an IObservable<AspNetRequest>, where AspNetRequest contains all the state data that would otherwise populate these tools, which would immensely help with debugging.  Imagine when your tester only needs to record a series of test cases once, and otherwise is just testing for UI errors.
  • Wrap event-oriented APIs for simplified logging and replaying.  (In JinxBot, an event-oriented chat API named for my cat, I always wanted to capture all the events of the core API and be able to replay them via a subclass, which would have allowed pixel-perfect replay of a chat session).
  • Handle periodic data services like Twitter, SMS, email, or others in a clean and efficient way.

I’d like to see this take off, but it’s a very different way of looking at programming than what most .NET developers are used to.  Enjoy it, take a look, and let’s build it up!


Unsung C# Hero: Closure

Posted by Rob

Today I’m going to talk about a feature of C# that has been around since 2.0 (with the introduction of anonymous delegates) but which gets nearly no lip service and, despite the fact that most C# developers have probably used it, they’ve probably used it without thinking about it.  This feature is called closure, and it refers to the ability of a nested function to make reference to the surrounding function’s variables.

This article will make extensive discussion of how delegates are implemented in C#; a review may be appropriate before diving in.  Also, we’ll be making a lot of use of The Tool Formerly Known as Lutz Roeder’s .NET Reflector, which is now owned by Red Gate Software.

Anonymous Methods without Closure

Supposing that I had a reason to do so, I could assign an event handler as an anonymous method.  I think this is generally bad practice (there is no way to explicitly dissociate the event handler, because it doesn’t have a name), but you can:

    public partial class SampleNoClosure : Form
        public SampleNoClosure()

            button1.Click += delegate
                MessageBox.Show("I was clicked!  See?");

This will work as expected; on click, a small alert dialog will appear.  Nothing terribly special about that, right?  We could have written that as a lambda expression as well, not that it buys us anything.  It looks like this in Reflector:

Anonymous method with no closure.

We see that the compiler auto-generates a method that matches the appropriate signature.  Nothing here should be completely surprising.

Simple Example of Closure

Here is a sample class that includes closure.  The enclosed variable is sum.  You’ll note that everything just makes sense internally, right? 

    public partial class SimpleClosureExample : Form
        public SimpleClosureExample()

            int sum = 1;
            for (int i = 1; i <= 10; i++)
                sum += sum * i;

            button1.Click += delegate
                MessageBox.Show("The sum was " + sum.ToString());

So, it only makes sense that sum can be part of that anonymous function, right?  But we need to bear in mind that all C# code must be statically-compiled; we can’t just calculate sum.  Besides, what happens if the value was a parameter to the function?  Something that couldn’t be precompiled?  Well, in order to handle these scenarios, we need to think about how this will work.

In order to keep that method state alive, we need to create another object.  That’s how the state can be maintained regardless of threads and regardless of calls to the function.  We can see it as a nested class here, and the anonymous method looks just like it does in code:

Closure supporting class

A More Advanced Example

Whenever you work with a LINQ expression, chances are you’re using closure and anonymous functions (and lambda expressions) and don’t realize it.  Consider this LINQ-to-SQL query:

            int boardID = 811;
            int perPage = 20;
            int pageIndex = 0;

            var topics = (from topic in dc.Topics
                          orderby topic.IsSticky descending, topic.LastMessage.TimePosted descending
                          where topic.BoardID == boardID
                          select new
                              Subject = topic.FirstMessage.Subject,
                              LatestSubject = topic.LastMessage.Subject,
                              LatestChange = topic.LastMessage.ModifiedTime,
                              NameOfUpdater = topic.LastMessage.PosterName,
                              Updater = topic.LastMessage.User,
                              Starter = topic.FirstMessage.User,
                              NameOfStarter = topic.FirstMessage.PosterName,
                            .Skip(perPage * pageIndex)
            foreach (var topic in topics)
                Console.WriteLine("{0} - {1} {2} {3} {4} by {5}", topic.Subject, topic.NameOfStarter, topic.ReplyCount, topic.ViewCount, topic.LatestChange, topic.NameOfUpdater);

The closure here is happening within the where clause; you may recall that the C# where clause evaluates to the IEnumerable<T> extension method Where(Func<TSource, bool> predicate).

Here, it’s very easy to imagine a case where we wanted to write actual parameters.  This query is used to generate and display a topic list for a message board; all “stickied” posts should be at the top and the rest should be sorted by last time posted.  If I’m making that into a web server control, I’m going to need to not hard-code the board ID, the number of topics per page to display, and which page I’m looking at.

Now, this is kind of a hard thing to conceptualize; when I was going through this project, I expected all three variables to be incorporated into the class.  It turns out that Skip() and Take() don’t evaluate a lambda expression – they take straight values – so we don’t ultimately have to store them for evaluation later.  However, as expected, boardID does have to be stored, and that provides us with an interesting question of why.  And you might be asking why that is even the case; LINQ-to-SQL translates this into SQL for us:

SELECT TOP (20) [t0].[TopicID], [t2].[Subject], [t1].[Subject] AS [LatestSubject], [t1].[ModifiedTime] AS [LatestChange], [t1].[PosterName] AS [NameOfUpdater], [t4].[test], [t4].[UserID], [t4].[Username], [t4].[Email], [t4].[PasswordHash], [t6].[test] AS [test2], [t6].[UserID] AS [UserID2], [t6].[Username] AS [Username2], [t6].[Email] AS [Email2], [t6].[PasswordHash] AS [PasswordHash2], [t2].[PosterName] AS [NameOfStarter], [t0].[ReplyCount], [t0].[ViewCount]
FROM [dbo].[Topics] AS [t0]
LEFT OUTER JOIN [dbo].[Messages] AS [t1] ON [t1].[MessageID] = [t0].[LastMessageID]
LEFT OUTER JOIN [dbo].[Messages] AS [t2] ON [t2].[MessageID] = [t0].[FirstMessageID]
    SELECT 1 AS [test], [t3].[UserID], [t3].[Username], [t3].[Email], [t3].[PasswordHash]
    FROM [dbo].[Users] AS [t3]
    ) AS [t4] ON [t4].[UserID] = [t1].[UserID]
    SELECT 1 AS [test], [t5].[UserID], [t5].[Username], [t5].[Email], [t5].[PasswordHash]
    FROM [dbo].[Users] AS [t5]
    ) AS [t6] ON [t6].[UserID] = [t2].[UserID]
WHERE [t0].[BoardID] = @p0
ORDER BY [t0].[IsSticky] DESC, [t1].[TimePosted] DESC

So why, if we already have the SQL generated, do we need to bother with it?  Well, you may recall that LINQ-to-SQL doesn’t support all possible operators.  If we break support for the LINQ-to-SQL query and we have to pull back all of the relevant items, we’ll have to use that class.  At this point though, it goes unused.


A closure is when you take the variables of a function and use them within a function declared inside of it – in C#, this is through anonymous delegates and lambda expressions.  C# typically will accomplish the use of closures by creating an implicit child class to contain the required state of the function as it executes, handing off the actual method to the contained class.

Further Reading


Your Own Transactions with LINQ-to-SQL

Posted by Rob

I’m working on porting an existing forum-based community from SMF to a new .NET-based forum platform that I’m authoring.  I’m excited about it; I love SMF, but it doesn’t have what I want and frankly, it’s a scary beast to try to tackle.  I’d considered using some kind of bridge between it and my code, but I knew I wanted deep integration of the forums with the new community site, and I wanted the community site in .NET.  So I made the decision to write an importer to talk between MySQL and my SQL Server-based solution.  I chose LINQ-to-SQL as my O/R mapper because, quite frankly, I find it much easier and more elegant to work with; so far as I know, I’m not the only one who thinks so.

Because of the nature of the data that I’m importing, I needed to run several SubmitChanges() calls to get the data into the database.  But I wanted to make sure that these submissions only worked if they ALL worked.  So I needed a transaction external to the normal LINQ-to-SQL in-memory object mapper.  Unfortunately, when I began a transaction using the underlying Connection property of the DataContext, I was met with an error:

System.InvalidOperationException: SqlConnection does not support parallel transactions.
   at System.Data.SqlClient.SqlInternalConnection.BeginSqlTransaction(IsolationLevel iso, String transactionName)
   at System.Data.SqlClient.SqlInternalConnection.BeginTransaction(IsolationLevel iso)
   at System.Data.SqlClient.SqlConnection.BeginDbTransaction(IsolationLevel isolationLevel)
   at System.Data.Linq.DataContext.SubmitChanges(ConflictMode failureMode)

The solution was simple: DataContext has a Transaction property!  By setting this to the transaction that I was beginning, I was able to run the complete import in a single transaction:

using (DbTransaction transaction = dc.Connection.BeginTransaction(IsolationLevel.ReadCommitted))
    dc.Transaction = transaction;
        // do databasey things

    catch (Exception ex)
        Console.WriteLine("Exception caught; transaction rolled back.");

It took about 2 minutes to import 37,000 or so messages, plus all users, categories, forums, private messages, and polls from SMF.  The app ends up taking something in the neighborhood of 120mb of memory (I need to keep objects around to reference them for their children, since I assign new IDs), but it’s still a small one-off price to pay.

Tagged as: , , No Comments

Using C# Enumerations as LINQ-to-SQL Entity Properties

Posted by Rob

Have you ever created a database object and said "OK, this column is going to correspond to this enumeration"?  If you're obsessive like me, you might have even gone so far as to create column restrictions for the valid range of values, and created a special SQL type that doesn't really do anything except give you peace of mind.

Well, I've got about three such fields on a couple entities on a recent project.  Since I wanted those properties to go into C# enumerations, I tried the natural thing: I typed the enumeration's type name into the "Type" property.

Setting a column's Type property.

Unfortunately, doing this didn't work.  In fact, it seemed to break Visual Studio; updates stopped propagating to my LINQ-to-SQL classes, and in fact since I had done this before a single save, I didn't get any entity classes. 

It turns out that Matt Davis found the answer to the problem: qualify the enumeration's type name with the global:: namespace qualifier if it doesn't live in a namespace (for instance, code within an ASP.NET App_Code folder).

The global:: qualifier is important.

Once I added the qualifier to my type names, saving the DBML file correctly updated the LINQ-to-SQL classes, and I was off and running!

Tagged as: , , No Comments