June 14, 2017

A ConcurrentStreamWriter Class

A class for writing to a stream from multiple threads:

public interface IConcurrentStreamWriter
{
    void WriteStream(
        string theStringToWrite);
}

/// <summary>
///     A concurrent stream writer. Allows concurrent/queued writes to a stream
/// </summary>
///
/// <seealso cref="T:ASM.DEK.Printer.Micron.PrintEngine.Timing.Timers.IConcurrentStreamWriter"/>
/// <seealso cref="T:System.IDisposable"/>
public class ConcurrentStreamWriter : IConcurrentStreamWriter, IDisposable
{
    private readonly Func<StreamWriter> _streamWriterProvider;
    private readonly ConcurrentQueue<string> queuedWrites = new ConcurrentQueue<string>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);

    private bool _exit;

    public ConcurrentStreamWriter(
        Func<StreamWriter> streamWriterProvider)
    {
        Contract.Requires<ArgumentNullException>(
            streamWriterProvider != null);

        _streamWriterProvider = streamWriterProvider;

        Task.Factory.StartNew(
            WriteQueueThreadRoutine, 
            TaskCreationOptions.LongRunning);
    }

    public void WriteStream(
        string theStringToWrite)
    {
        if (!string.IsNullOrEmpty(theStringToWrite))
        {
            queuedWrites.Enqueue(theStringToWrite);
            _autoResetEvent.Set();
        }
    }

    #region private

    private void WriteQueueThreadRoutine()
    {
        using (var stream = _streamWriterProvider())
        {
            while (!_exit)
            {
                _autoResetEvent.WaitOne();
                if (queuedWrites.Count > 0)
                {
                    WriteQueueImpl(stream);
                }
            }
        }
        _autoResetEvent.Dispose();
    }

    private void WriteQueueImpl(
        StreamWriter stream)
    {
        try
        {
            string stringToWrite = "";
            do
            {
                if (queuedWrites.TryDequeue(out stringToWrite))
                {
                    stream.WriteLine(stringToWrite);
                }
            } while (queuedWrites.Count > 0);
            stream.Flush();
        }
        catch (IOException ioex)
        {
            Trace.WriteLine("IOException caught: " + ioex);
        }
    }

    #endregion private

    #region Dispose Implementation

    // Use C# destructor syntax for finalization code.
    ~ConcurrentStreamWriter()
    {
        Dispose(false);
        Debug.Assert(false, "Detected ConcurrentStreamWriter object was undisposed");
    }

    private bool _isDisposed;

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(
        bool disposing)
    {
        if (_isDisposed)
            return;

        if (disposing)
        {
            _exit = true;
            _autoResetEvent.Set();
        }

        _isDisposed = true;
    }

    #endregion Dispose Implementation
}
It uses a ConcurrentQueue object to queue the stream requests up and an AutoResetEvent to release a writer on a separate thread to actually output the stream contents.

No comments: