Mar 31, 2010

Querying the future with Reactive extensions - a short introduction

Reactive extensions is a Microsoft DevLabs project for .Net and, more recently, on Javascript, that allows us to operate on future data similarly as Linq allows us to operate on IEnumerable.

Reactive extensions (Rx) operates mostly on IObservable and IObserver interfaces (in addition to combining these results with IEnumerable lists).

These interfaces follow a similar pattern as IEnumerable and IEnumerator, except that IEnumerable is a pull collection and IObservable is a push collection.

"Querying" the data from IObservable works on a subscription basis. When you subscribe to a IObservable, you can filter, combine, transform (etc) the results before you operate on the resulting object(s).

-==- Requirements -==-

VS2010 RC
Rx for VS2010 RC

OR

you can simple download Rx for .Net 3.5. All the samples should work the same in either environment.

Rx supports also Silverlight 3 environment, if you're so inclined.

-==- Subscribing to an Observable -==-

A simple observable would look like this:

// create an observable
var observable = Enumerator.Range(1,100).ToObservable();

// for each object we receive from observable, print it to console
observable.Subscribe(o => System.Console.WriteLine(o));  

Of course, we used a IEnumerable as a data source for this. Using a simple timed Observable:

// Create a observable that pushes a value each second to subscribers
var timedObservable = Observable.Interval(TimeSpan.FromSeconds(1));

// Write outputted values to console and do this until user pressed a key.
using (timedObservable.Subscribe(o => System.Console.WriteLine(o))) {
  System.Console.ReadKey();
}

Here we use Observable class to construct a timed observable. Given the TimeSpan value, it'll push a new value for every TimeSpan interval. Using the "using" statement, we can automatically unsubscribe when we go outside the block, thus stopping the printing. Running the example, you get a value each second until you press a key. ReadKey is blocking the execution of the rest of the console application, but the Observer we created by subscribing to the observable will run and print values until we unsubscribe, as it operates on another thread instead of running on the console thread.

-==- Queries -==-

Filtering observable results is as simple as writing Linq statements. Most of what you can do with Linq to objects is possible with Observables and more. Rx supports more extension methods then Linq normally does. Unfortunately, Rx still very much undocumented. Hopefully they'll remedy this situation soon.

Using a where clause:

var oneNumberPerSecond = Observable.Interval(TimeSpan.FromSeconds(1));

var lowNums = from n in oneNumberPerSecond
     where n < 5
     select n;

Console.WriteLine("Numbers < 5:");

lowNums.Subscribe(lowNum =>
{
 Console.WriteLine(lowNum);
});

Console.ReadKey();

The result of the filtered observable query will of course be:
0
1
2
3
4

As you can see, just like Linq. But just because your query is completed, the whole observerable isn't.
If you add an "OnCompleted" trigger to the subscription, you'll see that even when the numbers stop coming, the Observable is still working and can be subscribed to again.

lowNums.Subscribe(lowNum => System.Console.WriteLine(lowNum), () => System.Console.WriteLine("Completed."));


-==- Combining Observables -==-

Combining multiple observables into one is very simple.

var second = Observable.Interval(TimeSpan.FromSeconds(1));
var twoSeconds = Observable.Interval(TimeSpan.FromSeconds(2));

second.Merge(twoSeconds).Subscribe(s => System.Console.WriteLine(s));

System.Console.ReadKey();

Our Action delegate will launch the moment either of our observables will push a next value to the Observer (our subscription) and not wait for the other to complete.

But if we wanted to combine them similarly as we do SQL joins, that is, getting all the values in one "row", we can use Zip extension method.

second.Zip(twoSeconds, (left, right) => new { Left = left, Right = right })
 .Subscribe(s => System.Console.WriteLine("{0} : {1}", s.Left, s.Right));

Now because we have Zipped them together, our Action delegate will launch only when an event in both Observables has arrived. The expected result here will be:

0 : 0
1 : 1
2 : 2
...

And if you tried to run it, you've seen that it takes two seconds to print out each line instead one second, even if "one second" observer is churning out numbers faster then the other.

-==- What is it good for? -==-

If you haven't already found a good use in your mind for this, here's a few examples:
  • Using event delegates as Observables (see Visual Studio add-in from Clarius Consulting)
  • Querying the Windows 7 Sensor and Location APIs
  • Analyzing SQL StreamInsight temporal event streams
And of course you can create your own custom datasources. The reason I began studying its usage is that I needed a way to handle streaming objects from multiple datasources to multiple transformation components and combining the transformation results back to multiple outputs; Rx fit the bill for this perfectly. And these are just the classical examples. Rx works event better with functional languages like F# and JavaScript. That's it for now. Hopefully next time I'll have my small ETL framework, that's built mostly with Rx, to a enough "Alpha" state that I can show parts of it.

-==- Resources -==-

101 Rx samples (well, not yet that many)
Rx on PDC 2009
Rx for JavaScript on Mix 2010
Rx team blog - contains mostly videos on the Rx
Rx videos from Channel9

0 comments:

Post a Comment