July 14, 2021

Producer Consumer using PLINQ

I took the idea from this web page. In this article a number of experiments were run to find the optimum way to run a Producer-Consumer using the TPL.

// This calculates distances between a target system and a bunch of other systems
// Results are returned as a dictionary of a system id to a double where the double
// represents the distance from the given system to the target system
public IDictionary<int, double> CalcSystemDistances(
    int targSystemId, 
    ISet<int> systemIds)
{
    var idToDistanceMap = new ConcurrentDictionary<int, double>();
    // _systemDb is a repository of systems
    var targSystem = _systemDb.Get(targSystemId); 
    if (targSystem != null)
    {
        int WorkerCount = Math.Max(Environment.ProcessorCount - 2, 4);

        // Setup the queue
        var blockingCollection = new BlockingCollection<SystemInfo>();

        // Declare the worker - Calculates a distance
        Action<SystemInfo> work = eddbSys =>
        {
            double dist = targSystem.Location.DistanceTo(eddbSys.Location);
            idToDistanceMap.TryAdd(eddbSys.Id, dist);
        };

        // Begin producing source data
        Task.Run(() =>
        {
            foreach (var eddbSys in _systemDb.Get(systemIds).Where(s => s != null))
            {
                blockingCollection.Add(eddbSys);
            }
            blockingCollection.CompleteAdding();
        });

        // Start consuming
        blockingCollection.
            GetConsumingEnumerable().
            AsParallel().
            WithDegreeOfParallelism(WorkerCount).
            WithMergeOptions(ParallelMergeOptions.NotBuffered).
            ForAll(work);

    }
    return idToDistanceMap;
}

No comments: