Concurrency Encapsulation

One of my projects is a perfect candidate for concurrency. There is a collection of entities that each need to be processed, and each can be processed independently. For the sake of Separation of Concerns, I developed a WorkManager class dedicated to performing an action on the elements of an enumeration. Before I spill the code, here is how it is used:

Entity[] myCollection = LoadEntities();
WorkManager manager = new WorkManager();
manager.ForEach(myCollection, e => e.Process());

Note the lambda expression above. It means that for each Entity in myCollection, the code will execute its Process method. The program will create a separate thread for each processor, and evenly distribute the workload across the threads.

And without further delay, the WorkManager class:

using System;
using System.Collections.Generic;
using System.Threading;

public sealed class WorkManager
{
    private readonly int _numWorkers;
    private readonly Worker[] _workers;
    private readonly Semaphore _workStart;
    private readonly Semaphore _workFinish;

    private IWorkLoad _workLoad;
    private bool _stop;

    public WorkManager() : this(Environment.ProcessorCount) { }

    public WorkManager(int numWorkers)
    {
        if (numWorkers <= 1)
        {
            _numWorkers = 0;
            _workers = new Worker[0];
            return;
        }

        _numWorkers = numWorkers;
        _workers = new Worker[numWorkers];
        _workStart = new Semaphore(0, numWorkers);
        _workFinish = new Semaphore(0, numWorkers);
        for (int i = 0; i < numWorkers; i++)
            _workers[i] = new Worker(this);
    }

    public void ForEach<T>(IEnumerable<T> source, Action<T> action)
    {
        lock (_workers)
        {
            if (_stop)
                throw new Exception("Worker threads are stopped");

            // Shortcut for single threaded 
            if (_numWorkers == 0)
            {
                foreach (var item in source)
                    action(item);
                return;
            }

            _workLoad = new WorkLoad<T>(source, action);
            _workStart.Release(_numWorkers);
            for (int i = 0; i < _numWorkers; i++)
                _workFinish.WaitOne();
        }
    }

    public void Stop()
    {
        lock (_workers)
        {
            if (!_stop)
            {
                _stop = true;
                _workStart.Release(_numWorkers);
                for (int i = 0; i < _numWorkers; i++)
                    _workFinish.WaitOne();
            }
        }
    }


    private interface IWorkLoad
    {
        bool DoWork();
    }

    private sealed class WorkLoad<T> : IWorkLoad
    {
        private readonly IEnumerator<T> _source;
        private readonly Action<T> _action;

        public WorkLoad(IEnumerable<T> source, Action<T> action)
        {
            _source = source.GetEnumerator();
            _action = action;
        }

        public bool DoWork()
        {
            T workItem;
            lock (this)
                if (_source.MoveNext())
                    workItem = _source.Current;
                else
                    return false;
            _action(workItem);
            return true;
        }
    }

    private sealed class Worker
    {
        private readonly WorkManager _manager;
        private readonly Thread _thread;

        public Worker(WorkManager manager)
        {
            _manager = manager;
            _thread = new Thread(WorkCycle);
            _thread.IsBackground = true;
            _thread.Start();
        }

        private void WorkCycle()
        {
            while (true)
            {
                // Wait for the semaphone
                _manager._workStart.WaitOne();
                try
                {
                    if (_manager._stop)
                        break;
                    while (_manager._workLoad.DoWork()) ;
                }
                finally
                {
                    _manager._workFinish.Release();
                }
            }
        }
    }
}