by skeet via Jon Skeet: Coding Blog on 1/16/2010 8:13:42 PM
I've been researching Reactive Extensions for the last few days, with an eye to writing a short section in chapter 12 of the second edition of C# in Depth. (This is the most radically changed chapter from the first edition; it will be covering LINQ to SQL, IQueryable, LINQ to XML, Parallel LINQ, Reactive Extensions, and writing your own LINQ to Objects operators.) I've watched various videos from Channel 9, but today was the first time I actually played with it. I'm half excited, and half disappointed.
My excited half sees that there's an awful lot to experiment with, and loads to learn about join patterns etc. I'm also looking forward to trying genuine events (mouse movements etc) – so far my tests have been to do with collections.
My disappointed half thinks it's missing something. You see, Reactive Extensions shares some concepts with my own Push LINQ library… except it's had smarter people (no offense meant to Marc Gravell) working harder on it for longer. I'd expect it to be easier to use, and make it a breeze to do anything you could do in Push LINQ. Unfortunately, that's not quite the case.
First, the way that subscription is handled for collections seems slightly odd. I've been imagining two kinds of observable sources:
In the latter case, I could imagine an extension method to IEnumerable<T> called ToObservable which would return a StartableObservable<T> or something like that – you'd subscribe what you want, and then call Start on the StartableObservable<T>. That's not what appears to happen though – if you call ToObservable(), you get an implementation which iterates over the source sequence as soon as anything subscribes to it – which just doesn't feel right to me. Admittedly it makes life easy in the case where that's really all you want to do, but it's a pain otherwise.
IEnumerable<T>
StartableObservable<T>
ToObservable()
There's a way of working round this in Reactive Extensions: there's Subject<T> which is both an observer and an observable. You can create a Subject<T>, Subscribe all the observers you want (so as to set up the data pipeline) and then subscribe the subject to the real data source. It's not exactly hard, but it took me a while to work out, and it feels a little unwieldy. The next issue was somewhat more problematic.
Subject<T>
When I first started thinking about Push LINQ, it was motivated by a scenario from the C# newsgroup: someone wanted to group a collection in a particular way, and then count how many items were in each group. This is effectively the "favourite colour voting" scenario outlined in the link at the top of this post. The problem to understand is that the normal Count() call is blocking: it fetches items from a collection until there aren't any more; it's in control of the execution flow, effectively. That means if you call it in a grouping construct, the whole group has to be available before you call Count(). So, you can't stream an enormous data set, which is unfortunate.
Count()
In Push LINQ, I addressed this by making Count() return Future<int> instead of int. The whole query is evaluated, and then you can ask each future for its actual result. Unfortunately, that isn't the approach that the Reactive Framework has taken – it still returns int from Count(). I don't know the reason for this, but fortunately it's somewhat fixable. We can't change Observable of course, but we can add our own future-based extensions:
Future<int>
int
Observable
This uses Task<T> from Parallel Extensions, which gives us an interesting ability, as we'll see in a moment. It's all fairly straightforward - TaskCompletionSource<T> makes it very easy to specify a value when we've finished, or indicate that an error occurred. As mentioned in the comments, the maximum/minimum implementations leave something to be desired, but it's good enough for a blog post :)
Task<T>
TaskCompletionSource<T>
Now that we've got our extension methods, how can we use them? First I decided to do a demo which would count the number of lines in a file, and find the maximum and minimum line lengths:
As you can see, we use the Result property of a task to find its eventual result - this call will block until the result is ready, however, so you do need to be careful about how you use it. Each line is only read from the file once, and pushed to all three observers, who carry their state around until the sequence is complete, whereupon they publish the result to the task.
Result
I got this working fairly quickly - then went back to the "grouping lines by line length" problem I'd originally set myself. I want to group the lines of a file by their length (all lines of length 0, all lines of length 1 etc) and count each group. The result is effectively a histogram of line lengths. Constructing the query itself wasn't a problem - but iterating through the results was. Fundamentally, I don't understand the details of ToEnumerable yet, particularly the timing. I need to look into it more deeply, but I've got two alternative solutions for the moment.
ToEnumerable
The first is to implement my own ToList extension method. This simply creates a list and subscribes an observer which adds items to the list as it goes. There's no attempt at "safety" here - if you access the list before the source sequence has completed, you'll see whatever has been added so far. I am still just experimenting :) Here's the implementation:
ToList
Now we can construct a query expression, project each group using our future count, make sure we've finished pushing the source before we read the results, and everything is fine:
Note how the call to ToList is required before calling source.ToObservable(...).Subscribe - otherwise everything would have been pushed before we started collecting it.
source.ToObservable(...).Subscribe
All well and good... but there's another way of doing it too. We've only got a single task being produced for each group - instead of waiting until everything's finished before we dump the results to the console, we can use Task.ContinueWith to write it (the individual group result) out as soon as that group has been told that it's finished. We force this extra action to occur on the same thread as the observer just to make things easier in a console app... but it all works very neatly:
Task.ContinueWith
That's the lot, so far. It feels like I'm sort of in the spirit of Reactive Extensions, but that maybe I'm pushing it (no pun intended) in a direction which Erik and Wes either didn't anticipate, or at least don't view as particularly valuable/elegant. I very much doubt that they didn't consider deferred aggregates - it's much more likely that either I've missed some easy way of doing this, or there are good reasons why it's a bad idea. I hope to find out which at some point... but in the meantime, I really ought to work out a more idiomatic example for C# in Depth.
Original Post: First encounters with Reactive Extensions
The content of the postings is owned by the respective author. CSharpFeeds is not responsible for the contents of the postings. This site is automatically generated and cannot be reviewed for abusive content. If you find abusive content on CSharpFeeds, please contact us. Designated trademarks and brands are the property of their respective owners. All rights reserved.