A class for writing to a stream from multiple threads:
public interface IConcurrentStreamWriter { void WriteStream( string theStringToWrite); } /// <summary> /// A concurrent stream writer. Allows concurrent/queued writes to a stream /// </summary> /// /// <seealso cref="T:ASM.DEK.Printer.Micron.PrintEngine.Timing.Timers.IConcurrentStreamWriter"/> /// <seealso cref="T:System.IDisposable"/> public class ConcurrentStreamWriter : IConcurrentStreamWriter, IDisposable { private readonly Func<StreamWriter> _streamWriterProvider; private readonly ConcurrentQueue<string> queuedWrites = new ConcurrentQueue<string>(); private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false); private bool _exit; public ConcurrentStreamWriter( Func<StreamWriter> streamWriterProvider) { Contract.Requires<ArgumentNullException>( streamWriterProvider != null); _streamWriterProvider = streamWriterProvider; Task.Factory.StartNew( WriteQueueThreadRoutine, TaskCreationOptions.LongRunning); } public void WriteStream( string theStringToWrite) { if (!string.IsNullOrEmpty(theStringToWrite)) { queuedWrites.Enqueue(theStringToWrite); _autoResetEvent.Set(); } } #region private private void WriteQueueThreadRoutine() { using (var stream = _streamWriterProvider()) { while (!_exit) { _autoResetEvent.WaitOne(); if (queuedWrites.Count > 0) { WriteQueueImpl(stream); } } } _autoResetEvent.Dispose(); } private void WriteQueueImpl( StreamWriter stream) { try { string stringToWrite = ""; do { if (queuedWrites.TryDequeue(out stringToWrite)) { stream.WriteLine(stringToWrite); } } while (queuedWrites.Count > 0); stream.Flush(); } catch (IOException ioex) { Trace.WriteLine("IOException caught: " + ioex); } } #endregion private #region Dispose Implementation // Use C# destructor syntax for finalization code. ~ConcurrentStreamWriter() { Dispose(false); Debug.Assert(false, "Detected ConcurrentStreamWriter object was undisposed"); } private bool _isDisposed; public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose( bool disposing) { if (_isDisposed) return; if (disposing) { _exit = true; _autoResetEvent.Set(); } _isDisposed = true; } #endregion Dispose Implementation }It uses a ConcurrentQueue object to queue the stream requests up and an AutoResetEvent to release a writer on a separate thread to actually output the stream contents.
No comments:
Post a Comment