IObserver<T> - Reader/Consumer/Observer
IObservable<T> - Writer/Publisher/Observable sequence. [I find this interface name confusing.]
ISubject<in TSource, out TResult> - Represents an object that is both an observable sequence as well as an observer. Used in (non-reactive) samples but not in real situations.
In comparison to IEnumerable<T> where items are pulled out, in an IObservable<T> items are pushed in
A simple sample: Consider an image processing system in a factory that scans parts for results. The outcome of the scan could be one of three states
public enum ImageProcessingSystemResult { Failed, // failed image processing check Passed, // succeeded image processing check Indeterminate }Here is the publisher of the image processing system results:
// Publishes a sequence of ImageProcessingSystemResult to the observer public class ImageProcessingSystemPublisher : IObservable<ImageProcessingSystemResult> { public IDisposable Subscribe(IObserver<ImageProcessingSystemResult> observer) { // These results in this case are just published one after another observer.OnNext(ImageProcessingSystemResult.Passed); observer.OnNext(ImageProcessingSystemResult.Passed); observer.OnNext(ImageProcessingSystemResult.Failed); observer.OnNext(ImageProcessingSystemResult.Passed); observer.OnCompleted(); return Disposable.Empty; } }Here is a generic results observer:
// Reads/Observes a sequence of T and decides what to do with it // Basically, what you want to do when you receive some data public class MyObserver<T> : IObserver<T> { public void OnNext(T value) { string output = value != null ? value.ToString() : "null"; Console.WriteLine("Received value: {0}", output); } public void OnError(Exception error) { Console.WriteLine("Sequence faulted with exception: {0}", error.ToString()); } public void OnCompleted() { Console.WriteLine("Sequence terminated"); } }Here is how it all goes together:
public void UnreactiveSimpleTest() { var imageProcessingSystemPublisher = new ImageProcessingSystemPublisher(); var observer = new MyObserver<ImageProcessingSystemResult>(); // Here is how the observer subscribes to the publisher using (imageProcessingSystemPublisher.Subscribe(observer)) { } }Here is the output:
Received value: Passed Received value: Passed Received value: Failed Received value: Passed Sequence terminated Press any key to continue ...
Hot and Cold Observables (i.e. publishers)
Cold - Sequences that start producing notifications on request (when subscribed to), and each subscriber gets a copy of the same notification stream from the beginning.
Hot - Sequences that are active and produce notifications regardless of subscriptions (just like c# events).
A 'Cold' Observable can be converted to a 'Hot' Observable by using the Publish() extension method. With a hot Observable it is necessary to use "Connect()" to connect to the published sequence. This returns an IDisposable which must be disposed of when the Observable sequence is finished with.
No comments:
Post a Comment