August 19, 2021

C# Event Aggregators

Did some research into event aggregators. They are a great way to allow events to be fired betwen classes without the classes being tightly coupled, in other words without either class having to know about the other. Standard C# events make the classes tighly coupled, the listener needs to know about the firer. Sometimes with standard c# events, an event will need to be transmitted through an intermediate sequence of objects just to reach the target recipient, this is very cumbersome and laborious. Event aggregators completely decouple the event publishers from the event subscribers as long as both subscriber and publisher objects can access the event aggregator their is no need for intermediate objects to relay the events

Would make an excellent start for a simulator which has both hardware and software events occuring, an event aggregator could be used to input events to the system. As it is completely decoupled from the system, it would be easy to emulate firing these software events say from a GUI for emulation purposes, and also easy to connect to real hardware to populate those same software events.

I found several links that use a Reactive Extensions to implement an Event Aggregator. Using the Reactive Extensions the code to implement an aggregator is very small and compact.

  1. https://mikebridge.github.io/post/csharp-domain-event-aggregator
It is also a great example in the usage of Reactive Extensions. Currently I have combined them into this form:

using System.Reactive.Linq; // You'll need the Reactive nuget package
using System.Reactive.Subjects;
...

public interface IRxEventAggregator : IEventAggregator, IDisposable
{
    /// <summary>
    ///  Use this method to apply RX (LINQ style) expressions on
    ///  the events before subscribing
    /// </summary>
    /// <typeparam name="TEvent"></typeparam>
    /// <returns></returns>
    IObservable<TEvent> GetEventsObservable<TEvent>();
}

public interface IEventAggregator
{
    IDisposable Subscribe<T>(Action<T> action);
    void Publish<T>(T @event);
}


/// <summary>
/// An EventAggregator that is based on reactive extensions and supports
/// </summary>
public class RxEventAggregator
    : IRxEventAggregator
{
    readonly Subject<object> _subject = new Subject<object>();

    // Use this method to use RX (LINQ style) expressions
    // on the events before subscribing
    public IObservable<TEvent> GetEventsObservable<TEvent>()
    {
        return _subject.OfType<TEvent>().
            AsObservable();
    }

    public IDisposable Subscribe<T>(Action<T> action)
    {
        return GetEventsObservable<T>().
            Subscribe(action);
    }

    public void Publish<TEvent>(TEvent sampleEvent)
    {
        _subject.OnNext(sampleEvent);
    }

    #region IDisposable

... // See below

    #endregion
}

Generally an Event Aggregator lives throughout the life of your software so it really does not need disposing of. You could remove the Dispose code completely. For that reason I list it here just for reference:

    #region IDisposable

    bool _disposed;

    ~RxEventAggregator()
    {
        Dispose(false);
         // Useful for detecting memory leaks
        Debug.Assert(false, nameof(RxEventAggregator) + 
            " was not disposed of");
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this); 
    }

    protected virtual void Dispose(bool disposing)
    {
        if (_disposed) return;

        _subject.Dispose();
        _disposed = true;
    }

    #endregion

Here's another event aggregator:

// When there are a lot of events with lots of subscribers
public class RxEventAggregator
: IRxEventAggregator
{
    private volatile ConcurrentDictionary<Type, object> _subjects
        = new ConcurrentDictionary<Type, object>();

    // Use this method to use RX (LINQ style) expressions
    // on the events before subscribing
    public IObservable<TEvent> GetEventsObservable<TEvent>()
    {
        Trace.Assert(!_disposed, "Trying to access a disposed EventAggregator");
        var subject =
            (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent),
                        _ => new Subject<TEvent>());
        return subject.AsObservable();
    }

    public IDisposable Subscribe<T>(Action<T> action)
    {
        Trace.Assert(!_disposed, "Trying to access a disposed EventAggregator");
        return GetEventsObservable<T>().
            Subscribe(action);
    }

    public void Publish<TEvent>(TEvent sampleEvent)
    {
        Trace.Assert(!_disposed, "Trying to access a disposed EventAggregator");
        if (_subjects.TryGetValue(typeof(TEvent), out var subject))
        {
            ((ISubject<TEvent>)subject)
                .OnNext(sampleEvent);
        }
    }

    // If your Aggregator is a life (of the application) long one you can simply not bother with disposing of it
    #region IDisposable 

    bool _disposed;

    ~RxEventAggregator2()
    {
        Debug.Assert(false, nameof(RxEventAggregator2) + " was not disposed of"); // Useful for detecting memory leaks
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);  // no need to call finalizer now
    }

    protected virtual void Dispose(bool disposing)
    {
        if (_disposed) return;

        _disposed = true;
        // Make them unaccessible
        var hiddenSubjects = Interlocked.Exchange(ref _subjects, new ConcurrentDictionary<Type, object>());
        // Then dispose of them
        foreach (var subject in hiddenSubjects.Values.Cast<IDisposable>())
        {
            subject.Dispose();
        }

    }

    #endregion
}

No comments: