One of my projects is a perfect candidate for concurrency. There is a collection of entities that each need to be processed, and each can be processed independently. For the sake of Separation of Concerns, I developed a WorkManager class dedicated to performing an action on the elements of an enumeration. Before I spill the code, here is how it is used:
Entity[] myCollection = LoadEntities();
WorkManager manager = new WorkManager();manager.ForEach(myCollection, e => e.Process());
Note the lambda expression above. It means that for each Entity in myCollection, the code will execute its Process method. The program will create a separate thread for each processor, and evenly distribute the workload across the threads.
And without further delay, the WorkManager class:
using System;using System.Collections.Generic;using System.Threading;public sealed class WorkManager
{private readonly int _numWorkers;
private readonly Worker[] _workers;
private readonly Semaphore _workStart;
private readonly Semaphore _workFinish;
private IWorkLoad _workLoad;private bool _stop;
public WorkManager() : this(Environment.ProcessorCount) { }
public WorkManager(int numWorkers)
{ if (numWorkers <= 1) {_numWorkers = 0;
_workers = new Worker[0]; return;}
_numWorkers = numWorkers;
_workers = new Worker[numWorkers]; _workStart = new Semaphore(0, numWorkers); _workFinish = new Semaphore(0, numWorkers);for (int i = 0; i < numWorkers; i++)
_workers[i] = new Worker(this);
}
public void ForEach<T>(IEnumerable<T> source, Action<T> action)
{ lock (_workers) { if (_stop)throw new Exception("Worker threads are stopped");
// Shortcut for single threaded if (_numWorkers == 0) {foreach (var item in source)
action(item);
return;}
_workLoad = new WorkLoad<T>(source, action);_workStart.Release(_numWorkers);
for (int i = 0; i < _numWorkers; i++)
_workFinish.WaitOne();
}
}
public void Stop()
{ lock (_workers) { if (!_stop) { _stop = true;_workStart.Release(_numWorkers);
for (int i = 0; i < _numWorkers; i++)
_workFinish.WaitOne();
}
}
}
private interface IWorkLoad
{ bool DoWork();}
private sealed class WorkLoad<T> : IWorkLoad
{private readonly IEnumerator<T> _source;
private readonly Action<T> _action;
public WorkLoad(IEnumerable<T> source, Action<T> action) {_source = source.GetEnumerator();
_action = action;
}
public bool DoWork()
{T workItem;
lock (this)
if (_source.MoveNext())workItem = _source.Current;
elsereturn false;
_action(workItem);
return true;
}
}
private sealed class Worker
{private readonly WorkManager _manager;
private readonly Thread _thread;
public Worker(WorkManager manager) {_manager = manager;
_thread = new Thread(WorkCycle); _thread.IsBackground = true;_thread.Start();
}
private void WorkCycle()
{while (true)
{ // Wait for the semaphone_manager._workStart.WaitOne();
try { if (_manager._stop) break; while (_manager._workLoad.DoWork()) ;}
finally {_manager._workFinish.Release();
}
}
}
}
}