BlockingCollection<T> Demonstration in Producer-Consumer Scenarios

Play BlockingCollection&#60T&#62 Demonstration in Producer-Consumer Scenarios

The Discussion

  • User profile image

    so does the concurrent collection remove values after they've been retrived?

    if you dont want multiple consumers to process the same data, thats a good thing, but what if you do? like in a logger for instance, the app produces a bunch of messages, all the consumers want all the messages, and maybe a while after they've been produced. there migt be a log window in the app, but also a remote monitoring app for example


    how would you go about solving that?

  • User profile image

    re: does the concurrent collection remove values after they've been retrieved?


    If you just do:

            foreach(var item in blockingCollection) { ... }

    you'll be enumerating the contents without removing the values.  If you do:

            foreach(var item in blockingCollection.GetConsumingEnumerable()) { ... }

    then you'll be enumerating the contents and removing the values as you do.

  • User profile image

    Why does BlockingCollection<T> not expose an event for the consumer?

  • User profile image

    If I have multiple producers and one of them calls bc.CompleteAdding() wouldn't it be possible for the consumer to exit before the other consumers have finished their bc.Add() calls? Am I misunderstanding the behavior?



  • User profile image

    Just a minor thing, you said "all Five are output" but in the Console window there are only 4.   Doh!

  • User profile image

    I was just about to ask the same thing Smiley

  • User profile image

    "If I have multiple producers and one of them calls bc.CompleteAdding() wouldn't it be possible for the consumer to exit before the other consumers have finished their bc.Add() calls? Am I misunderstanding the behavior?"


    If you call CompleteAdding(), any more calls to Add() will throw.  Consumers will run until q empty and will throw on Take().  For multiple producers, you probably need a master that can signal producers to stop and then master can exit clean with a CompleteAdding.  Consumers can catch the cancel exception or use TryTake.  For that matter, the producers could use TryAdd as well.  Then other producers could exit cleanly and you may not need an out-of-band signal in that case.

  • User profile image

    re: "you said "all Five are output" but in the Console window there are only 4.   Doh!"


    It actually did output all five.  Unfortunately the screen capture software appears to only have been taking appx one image per second, and it didn't take a snapshot between the time that the consumer output its fourth value and the time that the console window closed... in that time window the 5th element did appear.  You can visibly see this delay in several other portions of the video, such as when I'm typing (and portions of words are magically appearing in chunks Wink) or when dragging around the console window.

  • User profile image

    For multiple producers where you want to call complete adding when they're all done, TPL's coordination constructs can come in very handy.  e.g. I can use a ContinueWhenAll to call CompleteAdding:


    var bc = new BlockingCollection<int>();

    var producers = new List<Task>();

    for(int i=0; i<numProducers; i++)


        producers.Add(Task.Factory.StartNew(() => /* produce data to bc here */));


    Task.Factory.ContinueWhenAll(producers.ToArray(), _ => bc.CompleteAdding());

  • User profile image

    " _ =>"


    I do no follow the underline here.  Is this something new?

    Also, what is the best way to get TPL now?  In a library or wait for .net 4.0?

  • User profile image

    re: "_ =>"


    An underscore is a valid identifier in C#.  I sometimes use it when I don't care about the variable in question but still need to name it, so I name it with an underscore.


    re: TPL


    .NET 4.  And Beta is available, and it has a go-live license associated with it, so I suggest you go download it today Smiley

  • User profile image



    Great question and something we actually thought of considering.  As you've pointed out, there is more than one way to pass data from a consumer to a producer.  If we think individually about the communication on either side of the data structure (P for production, C for consumption) as being synchronous or asynchronous we come up with four different styles: P(sync)/C(sync), P(async)/C(sync), P(sync)/C(async), and P(async)/C(async).  I believe you're referring to the last one. 


    P(sync)/C(sync) is essentially a single item data structure that synchronizes the producer and consumer in lock step with each other.  You can think of this as a two people in the middle of a bucket line trying to put out a fire.  The person closer to the beginning of the line will not grab another bucket (more work) until the person next to them takes their bucket and the person closer to the end of the line will wait and not do any work until a bucket is available.  This can be achieved in .NET 4 by creating a BlockingCollection<T> and bounding it with a capacity of 1. 


    P(aync)/C(sync) allows the producer to drop off items as fast as possible asynchronously but the consumer will block until data is available.  In our fire brigade example, this would be analogous to a single person throwing water on the fire and everyone else grabbing buckets of water, dropping them off near the thrower and returning to get more.  This is the default mode supported by BlockingCollection<T>. 


    P(sync)/C(async) is all about throttling.  Producers can only drop off as many items as their are available slots before being blocked but as soon as work is available, a consumer is created and given the data.  Think of this as the previous example accept that there are a limited number of buckets and so the people grabbing the water must wait until a bucket is emptied before filling it.   Instead of a single person throwing water, there is a pool of people sitting around and whenever a full bucket appears, one of them runs up, throws the water and returns to the pool. There isn't any built in functionality to support this in .NET 4.  With your idea of an event, this could be supported on BlockingCollection<T>.


    P(async)/C(async) is the last model and the one I think you're most interested in.  Producers can drop off as many items as they'd like and as soon as work is available, a consumer is created and given the data.  This is the combination of an arbitrary amount of workers grabbing buckets and dropping them off and an arbitrary amount of workers being handed buckets as soon as buckets are available.  This is the most scalable model and it is also the model that is supported by the CCR's Port types.


    That said, there are trade-offs to all of these models.  Speaking strictly from the consumer side, ordering becomes a big issue.  Blocking a thread allows you to easily maintain some state on a given thread and guarantee that you'll process messages in the order that they arrive (provided that the producer keeps the messages in order).  This is absolutely essential for some streaming scenarios (like encryption) and simply cannot be maintained by completely asynchronous communication. 


    And so, after going off on a huge (but hopefully informative) tangent, in short, we realize there's a gap in functionality but didn't want to cram every scenario into a single, bloated type.  We're hoping to fill some of these gaps in the next version of .NET but, until then, the CCR may be of benefit to you.



  • User profile image

    It would be excellent if these 10 min videos were woven into the MSDN help content. It seems to me that they are great as documentation. It's very cool that every type can have a video presentation like this; a computational cast of characters.

Add Your 2 Cents