Posted By: hack2roothotmai | Jun 23rd @ 12:45 PM
page 1 of 2
Comments: 31 | Views: 1175

Everyone welcome to see my code for Parallel alternative, XML serializationalternative and MEF alternative at

http://plugins.codeplex.com

    public class PluginWorkItem<T> : IPluginWorkItem
    {
        private volatile bool _isRunning;
        private List<PluginThreadSafeStack<T>> _data;
        private volatile int _dataCount = 0;
        private BackgroundWorker _mainThread;
        protected void PushData(T item, int threadID)
        {
            lock (this)
            {
                if (_dataCount > 0)
                {
                    _dataCount--;
                }
                _data[threadID].Push(item);
            }
        }
        protected T PopData(int threadID)
        {
            lock (this)
            {
                return _data[threadID].Pop();
            }
        }
        protected void PushData(T item)
        {
            lock (this)
            {
                if (_dataCount > 0)
                {
                    _dataCount--;
                }
                _data[GetMinIndex(GetThreadCount())].Push(item);
            }
        }
        protected int GetDataCount()
        {
            lock (this)
            {
                return _dataCount;
            }
        }
        protected T PopData()
        {
            lock (this)
            {
                return _data[GetMaxIndex(GetThreadCount())].Pop();
            }
        }
        private int GetThreadCount()
        {
            if (ThreadCount > 0 && ThreadCount < 64)
            {
                int threadCount = ThreadCount > Environment.ProcessorCount ? Environment.ProcessorCount : ThreadCount;
                return threadCount;
            }
            return 1;
        }
        protected virtual int ThreadCount { get { return Environment.ProcessorCount; } }
        protected virtual int DataCount { get { return 0; } }
        protected virtual void ProcessItem(int threadID)
        {
            int threadCount = GetThreadCount();
            bool useDataCount = DataCount > 0;
            T item;
            do
            {
                if (!IsRunning) return;
                item = PopData();
                if (item != null)
                {
                    if (!IsRunning) return;
                    ProcessData(item/*, threadID*/);
                }
            } while (!object.Equals(item, default(T)));
        }
        protected virtual void ProcessData(T item/*, int threadID*/) { }
        protected virtual void InitializeData() { }
        protected void InitializeThreadPoolData()
        {
            int threadCount = GetThreadCount();
 
            this._waitHandles = new WaitHandle[threadCount];
            for (int i = 0; i < threadCount; i++)
            {
                _waitHandles[i] = new AutoResetEvent(false);
            }
 
            this._data = new List<PluginThreadSafeStack<T>>(threadCount);
            for (int i = 0; i < threadCount; i++)
            {
                _data.Add(new PluginThreadSafeStack<T>());
            }
 
            _dataCount = DataCount;
            InitializeData();
            T item = default(T); //PopData();
            //if (item != null)
            //{
            //    ProcessData(item/*, 0*/);
            //}
 
            if (threadCount > 1)
            {
                int count = threadCount;
                int loop = 0;
                for (; loop < count; )
                {
                    while (_data[loop].Count == 0 && count > 1)
                    {
                        count--;
                    }
                    loop++;
                }
                if (count < threadCount)
                {
                    item = PopData(0);
 
                    for (int i = 0; (i + 1) < threadCount && !object.Equals(item, default(T)); )
                    {
                        int offset = 0;
                        do
                        {
                            ProcessData(item/*, i + offset + 1*/);
                            item = PopData(GetMaxIndex(i + offset + 2));
                            offset++;
                        }
                        while ((i + offset + 1) < threadCount && !object.Equals(item, default(T)));
                        count = i + offset + 1;
                        loop = 0;
                        for (; loop < count; )
                        {
                            while (_data[loop].Count == 0 && count > 1)
                            {
                                _data.RemoveAt(loop);
                                _data.Add(new PluginThreadSafeStack<T>());
                                count--;
                            }
                            loop++;
                        }
                        if (count == threadCount)
                        {
                            break;
                        }
                        item = PopData(GetMaxIndex(loop));
                        i = count - 1;
                    }
                }
            }
            for (int i = 0; i < threadCount - 1; i++)
            {
                if (_data[i + 1].Count > 0 && _data[i].Count == 0)
                {
                    throw new InvalidProgramException();
                }
            }
        }
        protected void Run()
        {
            int threadCount = GetThreadCount();
            for (int i = 0; i < threadCount; i++)
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessHandle), i);
            }
        }
        private int GetMaxIndex(int index)
        {
            int max = 0;
            int maxIndex = 0;
            for (int i = 0; i < index; i++)
            {
                if (_data[i].Count > max)
                {
                    max = _data[i].Count;
                    maxIndex = i;
                }
            }
            return maxIndex;
        }
        private int GetMinIndex(int index)
        {
            int min = int.MaxValue;
            int minIndex = 0;
            for (int i = 0; i < index; i++)
            {
                if (_data[i].Count < min)
                {
                    min = _data[i].Count;
                    minIndex = i;
                }
            }
            return minIndex;
        }
        private void ProcessHandle(object sync)
        {
            int threadID = (int)sync;
            AutoResetEvent waitHandle = (AutoResetEvent)_waitHandles[threadID];
            if (IsRunning)
            {
                ProcessItem(threadID);
            }
            waitHandle.Set();
            //MessageBox.Show(DateTime.Now.ToString());
        }
        protected bool IsRunning
        {
            get
            {
                return _isRunning;
            }
        }
        #region IWorkItem Members
        private WaitHandle[] _waitHandles;
        protected virtual void Initialize()
        {
        }
        protected virtual void Terminate()
        {
        }
        void IPluginWorkItem.Run()
        {
            lock (this)
            {
                if (_mainThread == null)
                {
                    _isRunning = true;
                    _mainThread = new BackgroundWorker();
                    _mainThread.DoWork += (object sender, DoWorkEventArgs e) =>
                    {
                        Initialize();
                        InitializeThreadPoolData();
                        Run();
                        WaitHandle.WaitAll(_waitHandles);
                        lock (this)
                        {
                            Terminate();
                            if (_mainThread != null)
                            {
                                _mainThread = null;
                                _isRunning = false;
                            }
                        }
                    };
                    _mainThread.WorkerSupportsCancellation = true;
                    _mainThread.RunWorkerAsync();
                }
            }
        }
        bool IPluginWorkItem.IsRunning
        {
            get
            {
                return _isRunning;
            }
        }
        void IPluginWorkItem.Terminate()
        {
            lock (this)
            {
                Terminate();
                if (_mainThread != null)
                {
                    _mainThread.CancelAsync();
                    _mainThread = null;
                    _isRunning = false;
                }
            }
        }
        #endregion
    }

blowdart
blowdart
Peek-a-boo

Umm yea, right. Why exactly?

stevo_
stevo_
Human after all

YAY! just what I wanted.. Expressionless

blowdart
blowdart
Peek-a-boo

So what you've go is a wrapper around the thread pool. And this replaces MEF how exactly?

littleguru
littleguru
<3 Seattle

Keep up the work! Smiley Try to evaluate on each piece of code that it really adds more than just a wrapper. Your multithreading stuff is a nice idea but the Parallel FX adds a lot more than just a wrapper around the ThreadPool!

I don't know if you have seen these two videos. They explain some of the ideas that have been implemented in the Parallel FX:

http://channel9.msdn.com/shows/Going+Deep/Inside-Parallel-Extensions-for-NET-2008-CTP-Part-1/
http://channel9.msdn.com/shows/Going+Deep/Inside-Parallel-Extensions-for-NET-2008-CTP-Part-2/

littleguru
littleguru
<3 Seattle

That's because you have a . in your nickname... The website seems to have problems with it.

littleguru
littleguru
<3 Seattle

Sure, but you could write a thin wrapper or a wrapper that adds a lot of logic. Your's is more like a thin wrapper (that does not add much on their own) and theirs is a wrapper that adds a lot of logic and algorithms.

littleguru
littleguru
<3 Seattle

Don't make the error to set thin wrappers equal with simplicity! The Parallel FX has a simple API but still a lot of logic runs underneath. A simple API doesn't mean that there's no logic underneath. Sometimes thin wrappers are even more complicated because the user needs to deal with everything from the underlying system, whereas logic could hide that away!

In your case the API is simple but does not add much to the ThreadPool. In the case of the Parallel FX the API is simple as well but adds a lot more logic so that load balancing works out better and the caches of the cores are used more efficiently.

Why do you want to kiss me? Tongue Out

stevo_
stevo_
Human after all

The challenge is to make a powerful abstraction thats simple.. (balanced) its easy to make a simple abstraction thats simple.. you should try and find something more unique (and perhaps more simple (simple != useless) because MEF and concurrency / threading are big tasks to take on).

I've found naturally the things I've ended up building are things that have caused me the biggest development friction (this is natural), and I'm certainly not having friction points with ioc/di frameworks.. or at least if I want more from them, would require a sizable dev effort.

Bass
Bass
www.s​preadfirefox.c​om/5years/

I use Mono.Addins Smiley

 

Could you state clearly why you are writing alternatives to MEF, ParallelFx, and XML serialization?  Not that it matters for you, since self-edification is a worthwhile goal, but for others to be interested, you need incentive.

PaoloM
PaoloM
Hypermediocrity

Proper spelling would be a wonderful starting point.

PaoloM
PaoloM
Hypermediocrity

This time quoting, so you don't go back and fix it, ok? Smiley

If you are thinking you are smart enouth, please help me spell anything you found incorrect. Show me you point, exactly.

QED.

Bass
Bass
www.s​preadfirefox.c​om/5years/

So are you going to correct Dovella's terrible grammar and spelling now? Or at least not attack people who do?

exoteric
exoteric
I : Next<I>

Hey, the guy literally asked for help. Dovella is in a league of his own - although there's one who surpasses even Dovella and that's peterwillcn. All in a good spirit Big Smile

PaoloM
PaoloM
Hypermediocrity

I don't have to do anything, but when people go around and spout nonsense claiming to be some kind of superhuman being with an array of knowledge that clearly is beyond the reach of the mere mortals working at Microsoft, well... let's start from something basic, like decent communication, ok?

page 1 of 2
Comments: 31 | Views: 1175
Microsoft Communities