One of my projects is a perfect candidate for concurrency. There is a collection of entities that each need to be processed, and each can be processed independently. For the sake of Separation of Concerns, I developed a WorkManager class dedicated to performing an action on the elements of an enumeration. Before I spill the code, here is how it is used:
Entity[] myCollection = LoadEntities();
WorkManager manager = new WorkManager();
manager.ForEach(myCollection, e => e.Process());
Note the lambda expression above. It means that for each Entity in myCollection, the code will execute its Process method. The program will create a separate thread for each processor, and evenly distribute the workload across the threads.
And without further delay, the WorkManager class:
using System;
using System.Collections.Generic;
using System.Threading;
public sealed class WorkManager
{
private readonly int _numWorkers;
private readonly Worker[] _workers;
private readonly Semaphore _workStart;
private readonly Semaphore _workFinish;
private IWorkLoad _workLoad;
private bool _stop;
public WorkManager() : this(Environment.ProcessorCount) { }
public WorkManager(int numWorkers)
{
if (numWorkers <= 1)
{
_numWorkers = 0;
_workers = new Worker[0];
return;
}
_numWorkers = numWorkers;
_workers = new Worker[numWorkers];
_workStart = new Semaphore(0, numWorkers);
_workFinish = new Semaphore(0, numWorkers);
for (int i = 0; i < numWorkers; i++)
_workers[i] = new Worker(this);
}
public void ForEach<T>(IEnumerable<T> source, Action<T> action)
{
lock (_workers)
{
if (_stop)
throw new Exception("Worker threads are stopped");
// Shortcut for single threaded
if (_numWorkers == 0)
{
foreach (var item in source)
action(item);
return;
}
_workLoad = new WorkLoad<T>(source, action);
_workStart.Release(_numWorkers);
for (int i = 0; i < _numWorkers; i++)
_workFinish.WaitOne();
}
}
public void Stop()
{
lock (_workers)
{
if (!_stop)
{
_stop = true;
_workStart.Release(_numWorkers);
for (int i = 0; i < _numWorkers; i++)
_workFinish.WaitOne();
}
}
}
private interface IWorkLoad
{
bool DoWork();
}
private sealed class WorkLoad<T> : IWorkLoad
{
private readonly IEnumerator<T> _source;
private readonly Action<T> _action;
public WorkLoad(IEnumerable<T> source, Action<T> action)
{
_source = source.GetEnumerator();
_action = action;
}
public bool DoWork()
{
T workItem;
lock (this)
if (_source.MoveNext())
workItem = _source.Current;
else
return false;
_action(workItem);
return true;
}
}
private sealed class Worker
{
private readonly WorkManager _manager;
private readonly Thread _thread;
public Worker(WorkManager manager)
{
_manager = manager;
_thread = new Thread(WorkCycle);
_thread.IsBackground = true;
_thread.Start();
}
private void WorkCycle()
{
while (true)
{
// Wait for the semaphone
_manager._workStart.WaitOne();
try
{
if (_manager._stop)
break;
while (_manager._workLoad.DoWork()) ;
}
finally
{
_manager._workFinish.Release();
}
}
}
}
}