A class for writing to a stream from multiple threads:
public interface IConcurrentStreamWriter
{
void WriteStream(
string theStringToWrite);
}
/// <summary>
/// A concurrent stream writer. Allows concurrent/queued writes to a stream
/// </summary>
///
/// <seealso cref="T:ASM.DEK.Printer.Micron.PrintEngine.Timing.Timers.IConcurrentStreamWriter"/>
/// <seealso cref="T:System.IDisposable"/>
public class ConcurrentStreamWriter : IConcurrentStreamWriter, IDisposable
{
private readonly Func<StreamWriter> _streamWriterProvider;
private readonly ConcurrentQueue<string> queuedWrites = new ConcurrentQueue<string>();
private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
private bool _exit;
public ConcurrentStreamWriter(
Func<StreamWriter> streamWriterProvider)
{
Contract.Requires<ArgumentNullException>(
streamWriterProvider != null);
_streamWriterProvider = streamWriterProvider;
Task.Factory.StartNew(
WriteQueueThreadRoutine,
TaskCreationOptions.LongRunning);
}
public void WriteStream(
string theStringToWrite)
{
if (!string.IsNullOrEmpty(theStringToWrite))
{
queuedWrites.Enqueue(theStringToWrite);
_autoResetEvent.Set();
}
}
#region private
private void WriteQueueThreadRoutine()
{
using (var stream = _streamWriterProvider())
{
while (!_exit)
{
_autoResetEvent.WaitOne();
if (queuedWrites.Count > 0)
{
WriteQueueImpl(stream);
}
}
}
_autoResetEvent.Dispose();
}
private void WriteQueueImpl(
StreamWriter stream)
{
try
{
string stringToWrite = "";
do
{
if (queuedWrites.TryDequeue(out stringToWrite))
{
stream.WriteLine(stringToWrite);
}
} while (queuedWrites.Count > 0);
stream.Flush();
}
catch (IOException ioex)
{
Trace.WriteLine("IOException caught: " + ioex);
}
}
#endregion private
#region Dispose Implementation
// Use C# destructor syntax for finalization code.
~ConcurrentStreamWriter()
{
Dispose(false);
Debug.Assert(false, "Detected ConcurrentStreamWriter object was undisposed");
}
private bool _isDisposed;
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(
bool disposing)
{
if (_isDisposed)
return;
if (disposing)
{
_exit = true;
_autoResetEvent.Set();
}
_isDisposed = true;
}
#endregion Dispose Implementation
}
It uses a ConcurrentQueue object to queue the stream requests up and an AutoResetEvent to release a writer on a separate thread to actually output the stream contents.
No comments:
Post a Comment