Everyone welcome to see my code for Parallel alternative, XML serializationalternative and MEF alternative at
http://plugins.codeplex.com/">http://plugins.codeplex.com
public class PluginWorkItem<T> : IPluginWorkItem
{
private volatile bool _isRunning;
private List<PluginThreadSafeStack<T>> _data;
private volatile int _dataCount = 0;
private BackgroundWorker _mainThread;
protected void PushData(T item, int threadID)
{
lock (this)
{
if (_dataCount > 0)
{
_dataCount--;
}
_data[threadID].Push(item);
}
}
protected T PopData(int threadID)
{
lock (this)
{
return _data[threadID].Pop();
}
}
protected void PushData(T item)
{
lock (this)
{
if (_dataCount > 0)
{
_dataCount--;
}
_data[GetMinIndex(GetThreadCount())].Push(item);
}
}
protected int GetDataCount()
{
lock (this)
{
return _dataCount;
}
}
protected T PopData()
{
lock (this)
{
return _data[GetMaxIndex(GetThreadCount())].Pop();
}
}
private int GetThreadCount()
{
if (ThreadCount > 0 && ThreadCount < 64)
{
int threadCount = ThreadCount > Environment.ProcessorCount ? Environment.ProcessorCount : ThreadCount;
return threadCount;
}
return 1;
}
protected virtual int ThreadCount { get { return Environment.ProcessorCount; } }
protected virtual int DataCount { get { return 0; } }
protected virtual void ProcessItem(int threadID)
{
int threadCount = GetThreadCount();
bool useDataCount = DataCount > 0;
T item;
do
{
if (!IsRunning) return;
item = PopData();
if (item != null)
{
if (!IsRunning) return;
ProcessData(item/*, threadID*/);
}
} while (!object.Equals(item, default(T)));
}
protected virtual void ProcessData(T item/*, int threadID*/) { }
protected virtual void InitializeData() { }
protected void InitializeThreadPoolData()
{
int threadCount = GetThreadCount();
this._waitHandles = new WaitHandle[threadCount];
for (int i = 0; i < threadCount; i++)
{
_waitHandles[i] = new AutoResetEvent(false);
}
this._data = new List<PluginThreadSafeStack<T>>(threadCount);
for (int i = 0; i < threadCount; i++)
{
_data.Add(new PluginThreadSafeStack<T>());
}
_dataCount = DataCount;
InitializeData();
T item = default(T); //PopData();
//if (item != null)
//{
// ProcessData(item/*, 0*/);
//}
if (threadCount > 1)
{
int count = threadCount;
int loop = 0;
for (; loop < count; )
{
while (_data[loop].Count == 0 && count > 1)
{
count--;
}
loop++;
}
if (count < threadCount)
{
item = PopData(0);
for (int i = 0; (i + 1) < threadCount && !object.Equals(item, default(T)); )
{
int offset = 0;
do
{
ProcessData(item/*, i + offset + 1*/);
item = PopData(GetMaxIndex(i + offset + 2));
offset++;
}
while ((i + offset + 1) < threadCount && !object.Equals(item, default(T)));
count = i + offset + 1;
loop = 0;
for (; loop < count; )
{
while (_data[loop].Count == 0 && count > 1)
{
_data.RemoveAt(loop);
_data.Add(new PluginThreadSafeStack<T>());
count--;
}
loop++;
}
if (count == threadCount)
{
break;
}
item = PopData(GetMaxIndex(loop));
i = count - 1;
}
}
}
for (int i = 0; i < threadCount - 1; i++)
{
if (_data[i + 1].Count > 0 && _data[i].Count == 0)
{
throw new InvalidProgramException();
}
}
}
protected void Run()
{
int threadCount = GetThreadCount();
for (int i = 0; i < threadCount; i++)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessHandle), i);
}
}
private int GetMaxIndex(int index)
{
int max = 0;
int maxIndex = 0;
for (int i = 0; i < index; i++)
{
if (_data[i].Count > max)
{
max = _data[i].Count;
maxIndex = i;
}
}
return maxIndex;
}
private int GetMinIndex(int index)
{
int min = int.MaxValue;
int minIndex = 0;
for (int i = 0; i < index; i++)
{
if (_data[i].Count < min)
{
min = _data[i].Count;
minIndex = i;
}
}
return minIndex;
}
private void ProcessHandle(object sync)
{
int threadID = (int)sync;
AutoResetEvent waitHandle = (AutoResetEvent)_waitHandles[threadID];
if (IsRunning)
{
ProcessItem(threadID);
}
waitHandle.Set();
//MessageBox.Show(DateTime.Now.ToString());
}
protected bool IsRunning
{
get
{
return _isRunning;
}
}
#region IWorkItem Members
private WaitHandle[] _waitHandles;
protected virtual void Initialize()
{
}
protected virtual void Terminate()
{
}
void IPluginWorkItem.Run()
{
lock (this)
{
if (_mainThread == null)
{
_isRunning = true;
_mainThread = new BackgroundWorker();
_mainThread.DoWork += (object sender, DoWorkEventArgs e) =>
{
Initialize();
InitializeThreadPoolData();
Run();
WaitHandle.WaitAll(_waitHandles);
lock (this)
{
Terminate();
if (_mainThread != null)
{
_mainThread = null;
_isRunning = false;
}
}
};
_mainThread.WorkerSupportsCancellation = true;
_mainThread.RunWorkerAsync();
}
}
}
bool IPluginWorkItem.IsRunning
{
get
{
return _isRunning;
}
}
void IPluginWorkItem.Terminate()
{
lock (this)
{
Terminate();
if (_mainThread != null)
{
_mainThread.CancelAsync();
_mainThread = null;
_isRunning = false;
}
}
}
#endregion
}