January 13, 2015

Reactive Extensions

IObserver<T> - Reader/Consumer/Observer
IObservable<T> - Writer/Publisher/Observable sequence. [I find this interface name confusing.]
ISubject<in TSource, out TResult> - Represents an object that is both an observable sequence as well as an observer. Used in (non-reactive) samples but not in real situations.
In comparison to IEnumerable<T> where items are pulled out, in an IObservable<T> items are pushed in
A simple sample: Consider an image processing system in a factory that scans parts for results. The outcome of the scan could be one of three states
public enum ImageProcessingSystemResult
{
    Failed, // failed image processing check
    Passed, // succeeded  image processing check
    Indeterminate
}
Here is the publisher of the image processing system results:
// Publishes a sequence of ImageProcessingSystemResult to the observer
public class ImageProcessingSystemPublisher : IObservable<ImageProcessingSystemResult>
{
    public IDisposable Subscribe(IObserver<ImageProcessingSystemResult> observer)
    {
        // These results in this case are just published one after another
        observer.OnNext(ImageProcessingSystemResult.Passed); 
        observer.OnNext(ImageProcessingSystemResult.Passed);
        observer.OnNext(ImageProcessingSystemResult.Failed);
        observer.OnNext(ImageProcessingSystemResult.Passed);
        observer.OnCompleted();
        return Disposable.Empty;
    }
}
Here is a generic results observer:
// Reads/Observes a sequence of T and decides what to do with it
// Basically, what you want to do when you receive some data
public class MyObserver<T> : IObserver<T>
{
    public void OnNext(T value)
    {
        string output = value != null ? value.ToString() : "null";
        Console.WriteLine("Received value: {0}", output);
    }
    public void OnError(Exception error)
    {
        Console.WriteLine("Sequence faulted with exception: {0}", error.ToString());
    }
    public void OnCompleted()
    {
        Console.WriteLine("Sequence terminated");
    }
}
Here is how it all goes together:
public void UnreactiveSimpleTest()
{
    var imageProcessingSystemPublisher = new ImageProcessingSystemPublisher();
    var observer = new MyObserver<ImageProcessingSystemResult>();
    // Here is how the observer subscribes to the publisher
    using (imageProcessingSystemPublisher.Subscribe(observer))
    { }
}
Here is the output:
Received value: Passed
Received value: Passed
Received value: Failed
Received value: Passed
Sequence terminated

Press any key to continue ...

Hot and Cold Observables (i.e. publishers)

Cold - Sequences that start producing notifications on request (when subscribed to), and each subscriber gets a copy of the same notification stream from the beginning.
Hot - Sequences that are active and produce notifications regardless of subscriptions (just like c# events).

A 'Cold' Observable can be converted to a 'Hot' Observable by using the Publish() extension method. With a hot Observable it is necessary to use "Connect()" to connect to the published sequence. This returns an IDisposable which must be disposed of when the Observable sequence is finished with.

Use the extension methods to get the work interfaces implemented for you

There is a way to convert existing c# events to an IObservable<>
internal class SimpleEventSource
{
    public event EventHandler<EventArgs> SimpleEvent;

    public IObservable<EventPattern<EventArgs>> GetSimpleEventPublisher()
    {
        // To consume SimpleEvent as an IObservable:
        IObservable<EventPattern<EventArgs>> eventAsObservable = Observable.
            FromEventPattern<EventArgs>(
              ev => SimpleEvent += ev,
              ev => SimpleEvent -= ev);
        return eventAsObservable;
    }

    // Normally this would be private (event is triggered internally) 
    // but for test purposes we make it public
    public void FireSimpleEvent()
    {
        EventHandler<EventArgs> tmp = SimpleEvent;
        if (tmp != null)
        {
            tmp(this, new EventArgs());
        }

    }
}
Here is a tester:
public void TestSimpleEventSource()
{
    var ses = new SimpleEventSource();
    ses.FireSimpleEvent(); // This one is missed

    IObservable<EventPattern<EventArgs>> stdEventPublisher = ses.GetSimpleEventPublisher();
    using (var subscription = stdEventPublisher.Subscribe(
        rfs => Console.WriteLine("Simple event fired")))
    {
        ses.FireSimpleEvent();
        ses.FireSimpleEvent();               
    }

    Console.WriteLine("Subscription cancelled");
}
and the output
Simple event fired
Simple event fired
Subscription cancelled

Press any key to continue ...


No comments: