January 13, 2015

Reactive Extensions

Here is a good introduction to the main interfaces: http://introtorx.com/Content/v1.0.10621.0/02_KeyTypes.html
IObserver<T> - Reader/Consumer/Observer
IObservable<T> - Writer/Publisher/Observable sequence. [I find this interface name confusing.]
ISubject<in TSource, out TResult> - Represents an object that is both an observable sequence as well as an observer. Used in (non-reactive) samples but not in real situations.
In comparison to IEnumerable<T> where items are pulled out, in an IObservable<T> items are pushed in
A simple sample: Consider an image processing system in a factory that scans parts for results. The outcome of the scan could be one of three states
public enum ImageProcessingSystemResult
    Failed, // failed image processing check
    Passed, // succeeded  image processing check
Here is the publisher of the image processing system results:
// Publishes a sequence of ImageProcessingSystemResult to the observer
public class ImageProcessingSystemPublisher : IObservable<ImageProcessingSystemResult>
    public IDisposable Subscribe(IObserver<ImageProcessingSystemResult> observer)
        // These results in this case are just published one after another
        return Disposable.Empty;
Here is a generic results observer:
// Reads/Observes a sequence of T and decides what to do with it
// Basically, what you want to do when you receive some data
public class MyObserver<T> : IObserver<T>
    public void OnNext(T value)
        string output = value != null ? value.ToString() : "null";
        Console.WriteLine("Received value: {0}", output);
    public void OnError(Exception error)
        Console.WriteLine("Sequence faulted with exception: {0}", error.ToString());
    public void OnCompleted()
        Console.WriteLine("Sequence terminated");
Here is how it all goes together:
public void UnreactiveSimpleTest()
    var imageProcessingSystemPublisher = new ImageProcessingSystemPublisher();
    var observer = new MyObserver<ImageProcessingSystemResult>();
    // Here is how the observer subscribes to the publisher
    using (imageProcessingSystemPublisher.Subscribe(observer))
    { }
Here is the output:
Received value: Passed
Received value: Passed
Received value: Failed
Received value: Passed
Sequence terminated

Hot and Cold Observables (i.e. publishers)

Cold - Sequences that start producing notifications on request (when subscribed to), and each subscriber gets a copy of the same notification stream from the beginning.
Hot - Sequences that are active and produce notifications regardless of subscriptions (just like c# events).

A 'Cold' Observable can be converted to a 'Hot' Observable by using the Publish() extension method. With a hot Observable it is necessary to use "Connect()" to connect to the published sequence. This returns an IDisposable which must be disposed of when the Observable sequence is finished with.

