Entries:
Comments:
Posts:

Loading User Information from Channel 9

Something went wrong getting user information from Channel 9

Latest Achievement:

Loading User Information from MSDN

Something went wrong getting user information from MSDN

Visual Studio Achievements

Latest Achievement:

Loading Visual Studio Achievements

Something went wrong getting the Visual Studio Achievements

CCR Programming - Jeffrey Richter and George Chrysanthakopoulos

Download

Right click “Save as…”

Do you remember our introduction to the Concurrency and Coordination Runtime (CCR) with George Chrysanthakopoulos? What about our discussion with the team that implemented a robotics framework on top of it (Microsoft Robotics Studio)? Well, now we dive into the CCR API itself and learn how to use it.

If you're a developer ustilizing Microsoft platform technologies, you've probably heard of Wintellect's Jeffrey Richter. He's worked on everything from Windows to the CLR.

CCR creator George Chrysanthakopoulos commisioned Jeffrey to help make the CCR API both more approachable as well as more consitent with BCL's syntactic conventions. 

Here, George Chrysanthakopoulos, Charles and Jeffrey Richter dive into demos of the CCR API as well as discuss the What How and Why of it.

At the end of the interview, you'll see an excellent real world example of how to use the CCR... (Not going to give it away. You need to watch the whole video Smiley)

Please check out Jeffrey's latest installment of Concurrent Affairs to learn more about the new, improved CCR API and get a hold of sample code to play with. Also, see George's Microsoft Robotics Wiki for great information.

Tags:

Follow the Discussion

  • Jonathan MerriweatherCyonix Me
    Great video, and Charles you need to take more film next time Tongue Out

    It's great having George on c9.

    I haven't played with the CCR yet, but why hasn't the CCR been released on its own?

    Releasing it with the robotics pack would put people off using it in general applications.
  • CharlesCharles Welcome Change
    Smiley

    For this interview, I do wish I had brought an extra tape, but then we would have had the first two-hour interview on Channel 9!

    It was great to have the people who thought up and developed the CCR (primarily George) and created the new BCL-compliant API (Jeffrey - George) explaining to us how to use the API. One of the great things about Channel 9 in my opinion is that we learn about technologies from the folks who think them up and make them...

    And, we'll have more George on C9 in the future...

    C
  • William Staceystaceyw Before C# there was darkness...
    Nice.  I Always look forward to George and more CCR.  I am still trying to wrap my head around how this can help with the big async need - Async Sockets!  Sounds like we still need s.BeginReceive() and handle locking on shared state object parameters (i.e. bytes received) and kick off another read if we did not read enouph bytes so we can build the whole packet to parse it.  Then start again for the next client request, etc.  Using example like WebRequest or reading a whole file is kinda cheating because it does all that stuff for you behind the covers.  How might async sockets work with the CCR in a general code flow?  

    What I would really like to see at some point is a way to Read the int length (i.e. 4 byte prepended length), then kick off another read to read len bytes async and don't notify me again until len bytes read or an exception.  Then my delegate only needs to deal with whole packets and I can parse, process, write reply, and post another read. 

    BTW, the thing with the WaitForEnter() is common to what we have had to do with threaded console apps already.  So not new to the CCR, but I got your point.  Thanks guys and great work!!
    --
    wjs
  • Andrew DaveyAndrew Davey www.​aboutcode.​net
    Do all the nice operator overloads still exist in the CCR?
    Whilst having a library that is consistent with the BCL is very important, the operator overloads really let you concentrate on only your code. Reducing code noise is an important goal IMHO. Since C# has no syntactic macro capabilities, operator overloads are the only way to abstract away the boiler plate code. (Being a purely additive feature means no has to use them if they like typing lots.)

    If I get time I will look at making some syntactic macros for Boo or Nemerle, to make coding against the CCR look awesome Smiley

    Also, make the CCR available as a separate download - this will certainly increase uptake by the community. (Dare I say make the project shared-source? Seeing the code would hopefully make it easier for people to write their own extensions.)

    Keep up the great work!
  • staceyw wrote:
    Nice.  I Always look forward to George and more CCR.  I am still trying to wrap my head around how this can help with the big async need - Async Sockets!  Sounds like we still need s.BeginReceive() and handle locking on shared state object parameters (i.e. bytes received) and kick off another read if we did not read enouph bytes so we can build the whole packet to parse it.  Then start again for the next client request, etc.  Using example like WebRequest or reading a whole file is kinda cheating because it does all that stuff for you behind the covers.  How might async sockets work with the CCR in a general code flow?  

    What I would really like to see at some point is a way to Read the int length (i.e. 4 byte prepended length), then kick off another read to read len bytes async and don't notify me again until len bytes read or an exception.  Then my delegate only needs to deal with whole packets and I can parse, process, write reply, and post another read. 

    BTW, the thing with the WaitForEnter() is common to what we have had to do with threaded console apps already.  So not new to the CCR, but I got your point.  Thanks guys and great work!!
    --
    wjs


    Hi Stacey, did you take chance to play with the asynchronous web and file i/o examples Jeff has in the MSDN article? Its exactly what you want to do... As for atomically updating state, you can use our interleave pattern to protect concurrent access to state, without using locks. Just essentially annotate some piece of code as exclusive, attach that method to a port, and then issue messages when you want to do atomic updates.

    The robotics service tutorials talk about this as well
  • Andrew Davey wrote:
    Do all the nice operator overloads still exist in the CCR?
    Whilst having a library that is consistent with the BCL is very important, the operator overloads really let you concentrate on only your code. Reducing code noise is an important goal IMHO. Since C# has no syntactic macro capabilities, operator overloads are the only way to abstract away the boiler plate code. (Being a purely additive feature means no has to use them if they like typing lots.)

    If I get time I will look at making some syntactic macros for Boo or Nemerle, to make coding against the CCR look awesome

    Also, make the CCR available as a separate download - this will certainly increase uptake by the community. (Dare I say make the project shared-source? Seeing the code would hopefully make it easier for people to write their own extensions.)

    Keep up the great work!


    Hi Andrew, we are a small team, and releasing the CCR standalone, while it has been requested multiple times, is something we cant support at this time. Hopefully this will happen at some point. Until then, using the MSDN article jeff wrote, our robotics tutorials and the wiki, people get a stable runtime/library, decent user guide style documentation and the ability to try out the bits.

    Releasing the source is also something we can work towards ( i think its a good idea) but i cant make any commitments at this point.
  • Tommy CarlierTommyCarlier Widen your gaze
    This has been one of the coolest videos on C9. I like the style, the walking through the code, asking questions, very technical content. Of course, the subject is very exciting. I've built some classes myself to simplify asynchronous programming, but it's very primitive, compared to CCR. I agree with the previous comments: bring more tape next time (2 hours of this stuff would be awesome), try to release CCR separately from the robotics pack (maybe shared-source on CodePlex?), keep the operator overloads as alternatives.
  • I would concur with the other posters, you guys should release this as a seperate product from the Robotics download. Just put it up as a download by itself; it doesn't need its own MSDN developer center or anything so elaborate. Smiley

    Another thing I'd like to request is XML documentation on the code. I really rely on this sort of thing while coding for quick information on parameters, methods, etc. It would be a really nice thing to have.

    I'm really excited about this library. My jaw dropped a little seeing the build times decrease by nearly the number of processors in the system. So many of us developers have at least hyper-threaded processors, if not physical dual core machines. The real application seems obvious there.

    I am also really excited about the lockless way of sharing state by using that particular arbiter task...I think Interleave was the name. That sounds really sweet -- do you know how much we devs toil over using locks just to get the shared state right? Man, if I could move some of my code to use these interleaves rather than locks between methods, holy cow I could save so much time and probably improve performance as well. Cool

    George, this library looks great and the demos really help out. I wondering if there are any demos that use Windows Forms? As you know, UI code combined with concurrency gets hairy since we can touch the UI only from a single thread. Is this something the CCR can still work with? I see a WinForms adapter bundled inside the Robotics IDE...maybe you could elaborate a little?
  • Judah wrote:
    I would concur with the other posters, you guys should release this as a seperate product from the Robotics download. Just put it up as a download by itself; it doesn't need its own MSDN developer center or anything so elaborate. Smiley

    Another thing I'd like to request is XML documentation on the code. I really rely on this sort of thing while coding for quick information on parameters, methods, etc. It would be a really nice thing to have.

    I'm really excited about this library. My jaw dropped a little seeing the build times decrease by nearly the number of processors in the system. So many of us developers have at least hyper-threaded processors, if not physical dual core machines. The real application seems obvious there.

    I am also really excited about the lockless way of sharing state by using that particular arbiter task...I think Interleave was the name. That sounds really sweet -- do you know how much we devs toil over using locks just to get the shared state right? Man, if I could move some of my code to use these interleaves rather than locks between methods, holy cow I could save so much time and probably improve performance as well.

    George, this library looks great and the demos really help out. I wondering if there are any demos that use Windows Forms? As you know, UI code combined with concurrency gets hairy since we can touch the UI only from a single thread. Is this something the CCR can still work with? I see a WinForms adapter bundled inside the Robotics IDE...maybe you could elaborate a little?


    Regarding interleave: Yes its sweat. But what is even nicer is that you can just decorate a method with an attribute saying Exclusive or Concurrent, and we take care of the rest. This is in the context of our service runtime, DSS. In robotics, service tutorial 1 introduces this. Our service tutorials are robotics independent, we have layerd the system so CCR knows nothing about distribution (DSS) or robotics. And DSS knows nothing about robotics services but uses the CCR.

    Regarding XML documentation: We hear you, at some point this willbe available. Until then i have documented all APIs with comments that should be available when you hover over the API in the VS editor

    Regarding forms: We indeed have alittle wrapper, we call it the winforms service that is part of the ccr library. You basically post messages to it to launch a form, invoke handlers in the context of the UI thread etc. The robotics CTP contains services that use winforms + CCR so i would recommend you use one of them as an example.


  • "you can just decorate a method with an attribute saying Exclusive or Concurrent, and we take care of the rest."

    Awesome. Cool

    Thanks for the info. I'll be checking out those robotics tutorials a little more.
  • jsampsonPCjsampsonPC SampsonBlog.​com Sampson​Videos.com
    George Chrysanthakopoulos is arguably the most exciting person, and one of the most encouraging programmers I've ever seen on c9. He's naturally motivated, and you can see his passion for his work in every instance that you speak wiht him. Truly an inspiration to love what you do, and be great at it!

    Can't wait for more Georgey!
  • William Staceystaceyw Before C# there was darkness...
    georgioc wrote:
    
    staceyw wrote: Nice.  I Always look forward to George and more CCR.  I am still trying to wrap my head around how this can help with the big async need - Async Sockets!  Sounds like we still need s.BeginReceive() and handle locking on shared state object parameters (i.e. bytes received) and kick off another read if we did not read enouph bytes so we can build the whole packet to parse it.  Then start again for the next client request, etc.  Using example like WebRequest or reading a whole file is kinda cheating because it does all that stuff for you behind the covers.  How might async sockets work with the CCR in a general code flow?  

    What I would really like to see at some point is a way to Read the int length (i.e. 4 byte prepended length), then kick off another read to read len bytes async and don't notify me again until len bytes read or an exception.  Then my delegate only needs to deal with whole packets and I can parse, process, write reply, and post another read. 

    BTW, the thing with the WaitForEnter() is common to what we have had to do with threaded console apps already.  So not new to the CCR, but I got your point.  Thanks guys and great work!!
    --
    wjs



    Hi Stacey, did you take chance to play with the asynchronous web and file i/o examples Jeff has in the MSDN article? Its exactly what you want to do... As for atomically updating state, you can use our interleave pattern to protect concurrent access to state, without using locks. Just essentially annotate some piece of code as exclusive, attach that method to a port, and then issue messages when you want to do atomic updates.

    The robotics service tutorials talk about this as well



    Hi George. That sample does not really address the special needs of sockets.
    Also, I note some issues with AsyncStreamDemo:
    1) Assumes file is <= 1000 bytes. If file is larger, we don't get the rest. It also assumes it will read the whole file
       or 1000 bytes (what ever is smaller). That may be true for FileStream, but not for Sockets. A socket read could return 1 byte. So
       you have to loop. This is not addressed anywhere in the same.
    2) Windows uses Sync if read is < 64KB. So it is actually doing a sync read. I understand it is just a sample.

    So I rolled up the sleeves and dug in. This basically spins up a listener on another thread. The reads are done using pure async until a whole message is read, then the result is posted to the port. Then another read is started, etc. Also, client timeout is handled with a timer port. This seems to work well, however have not did major testing. Does this pattern seem reasonable or would you refactor this different? 

    Code:

        // Act as both socket client and server. Server is listens on another thread.
        // Note: Make sure to turn your local firewall off or open the correct port.
        private static void SocketReadDemo(DispatcherQueue dq)
        {
            Console.WriteLine();
            Console.WriteLine("SocketReadDemo:");

            // Start server listening on another thread.
            Thread t = new Thread(SocketServer);
            t.IsBackground = true;
            t.Start(dq);
            Thread.Sleep(500); // Wait for server to spin up.

            // Connect to server and send a message.
            TcpClient c = new TcpClient();
            c.Connect(IPAddress.Loopback, 9000);

            for ( int i=0; i<3; i++ )
            {
                byte[] buf = Encoding.ASCII.GetBytes("Message" + i);
                byte[] len = BitConverter.GetBytes(buf.Length);
                c.GetStream().Write(len, 0, len.Length); //Prepend len bytes.
                c.GetStream().Write(buf, 0, buf.Length); //Write data.
            }

            // Close socket.
            Thread.Sleep(5000); // Force a timeout on server side, so we can see the timeout exception.
            c.Close();

            HitEnter();
        }

        private static void SocketServer(object value)
        {
            DispatcherQueue dq = (DispatcherQueue)value;
            TcpListener l = new TcpListener(9000);
            l.Start();

            while ( true )
            {
                Console.WriteLine("Server waiting for client...");
                Socket s = l.AcceptSocket();
                Console.WriteLine("Client connected.");
                HandleClient(dq, s);
            }
        }

        private static void HandleClient(DispatcherQueue dq, Socket s)
        {
            Port<byte[]> bytesPort = new Port<byte[]>();
            Port<Exception> failurePort = new Port<Exception>();
            Port<DateTime> timeoutPort = new Port<DateTime>();
            dq.EnqueueTimer(TimeSpan.FromMilliseconds(3000), timeoutPort);

            // Kick off async read of a whole client message.
            GetMessage(s, bytesPort, failurePort);

            // Create Arbiter that will handle client read result.
            Arbiter.Activate(dq,
                Arbiter.Choice(
                    Arbiter.Receive(false, bytesPort,
                        delegate(byte[] buf)
                        {
                            // Do something with the client message.
                            Msg("Message read, bytes={0}, data: {1}", buf.Length, Encoding.ASCII.GetString(buf));
                            // Kick off another read.
                            Console.WriteLine("Get another client message.");
                            HandleClient(dq, s);
                        }),

                    // Upon timeout, stop all processing.
                    Arbiter.Receive(false, timeoutPort,
                        delegate(DateTime dt)
                        {
                            Msg("Client request did not completed within timeout.");
                            if ( s != null )
                                s.Close();
                        }),

                    Arbiter.Receive(false, failurePort,
                        delegate(Exception e)
                        {
                            Msg("Socket read failed, error={0}", e.Message);
                            if ( s != null )
                            {
                                s.Shutdown(SocketShutdown.Both);
                                s.Close();
                            }
                        })));
           
            // Return without blocking.
        }

        // This will read a single message from a client socket async (i.e. is not a blocking call).
        // Either the whole message will be posted to the port, or an exception will be posted.
        // Note: Because we are using async reads, we don't have option to timeout. However, timeout is
        // handled by a DQ timer in the control block - nice! This saves having to do a kill loop on another thread.
        public static void GetMessage(Socket s, Port<byte[]> portBytes, Port<Exception> portException)
        {
            // Remember, which sockets, you are only gaureenteed to read 1 byte each read (or error), so
            // you need to handle with recursion.

            if ( s == null )
                throw new ArgumentNullException("Socket");

            int bytesRead = 0;
            int bytesToRead = 4;
            bool gettingData = false;
            byte[] buffer = new byte[4];

            AsyncCallback cb = null;
            cb = delegate(IAsyncResult ar)
            {
                try
                {
                    int read = s.EndReceive(ar);
                    if ( read == 0 )
                    {
                        portException.Post(new Exception("Socket closed."));
                        return;
                    }
                    Console.WriteLine("GetMessage read {0} bytes.", read);

                    bytesToRead -= read;
                    if ( bytesToRead == 0 )
                    {
                        if ( gettingData )
                        {
                            // Send data buf to port.
                            portBytes.Post(buffer);
                        }
                        else
                        {
                            // Got len, so getting data.
                            bytesToRead = BitConverter.ToInt32(buffer, 0);
                            buffer = new byte[bytesToRead];
                            bytesRead = 0;
                            gettingData = true;

                            // Kick off first read of the actual message bytes.
                            s.BeginReceive(buffer, bytesRead, bytesToRead - bytesRead, SocketFlags.None, cb, null);
                        }
                    }
                    else // Else, more data, so kick off another read.
                        s.BeginReceive(buffer, bytesRead, bytesToRead - bytesRead, SocketFlags.None, cb, null);
                }
                catch ( Exception e )
                {
                    portException.Post(e);
                }
            };

            try
            {
                // Kick off first read of len bytes.
                s.BeginReceive(buffer, bytesRead, bytesToRead - bytesRead, SocketFlags.None, cb, null);
            }
            catch ( Exception e )
            {
                portException.Post(e);
            }
        }

     

  • stacey, awesome, i am so glad people finally have the bits in their hands Wink
     You are right the samples we have in the article are very simple (they have to be to ease people in it) and will not deal with doing packet based reads from a stream based transport.

    Now what you did is very reasonable and is a common way to wrap the C# async pattern.

    Now, the idea is that you do this once, make your little socket wrapper a little class that exposes a port, with some persistent receivers waiting for requests (read/write messages)
    and now your applications communicate with sockets just by posting messages to your CCR port. Now, that you have done the little wrapper, you can start doing cool coordination stuff with MultipleItemReceive, iterators to sequence multiple async requests etc.

    Essentially what you did below is very similar to our TcpReadScheduler that is inside the DssRuntime.DLL library in the robotics SDK. Its in the RdmapTransportService class.

    To wrap all this you can define a SocketServicePort that looks like this:

    class SocketServicePort : PortSet<ReadPacket,WritePacket, Shutdown>
    {
    }

    // this is approximately the type definitions
    class ReadPacket
    {
        public PacketSize
        public PortSet<byte [], Exception> Result =
            new PortSet<byte [],Exception>();
    }


    now the SocketService would look like this


    public class SocketService
    {
       
        private SocketServicePort _mainPort = new SocketServicePort();
        public static SocketServicePort Create(int tcpPort)
        {
              return new SocketService(tcpPort).Init();
        }

         private SocketServicePort Init()
         {

           
               Activate(Arbiter.Interleave(
                    new TeardownReceiverGroup(
                        Arbiter.Receive<Shutdown>
                       (_mainPort,false,ShutdownHandler)),
                    new ConcurrentReceiverGroup(
                       Arbiter.Receive<ReadPacket>
                       (true,mainPort,ReadHandler),
                       Arbiter.Receive<WritePacket>
                       (true,mainPort,WriteHandler))
                    new ExclusiveReceiverGroup()
               ));

         }

          // make this an iterator if you want to do multiple async reads
          // in sequence
         private void ReadHandler(ReadPacket packet)
         {
               // put your code that does the socket i/o here
         }


    }











  • Althought I don't pretend that I understand this library I somehow feel  a event-like syntax would be perhaps more intuitive. For example we could use the += instead of Arbiter.Choice (or would that be counter-intuitive or would complicate the c# syntax?)
  • schrepfler wrote:
    Althought I don't pretend that I understand this library I somehow feel  a event-like syntax would be perhaps more intuitive. For example we could use the += instead of Arbiter.Choice (or would that be counter-intuitive or would complicate the c# syntax?)


    I wanted to stay away from overloading syntax people are already familiar with and does one particular thing, and then using it in a different way. Our static methods make it clear we are not events (for example choice over two receivers is not persistent, unlike event registration. Also, for every one message only one delegate can fire). There are many serious differences between the CLR event mechanisms and the CCR. One of the most important ones, is that unlike the event system, we have a clear affinity with a particular dispatcher queue and we *always* run the handler in a thread context associated with the dispatcher you used on registration. Events in the CLR, dependening on the library, have re-entrancy issues, thread affinity issues, etc.

    hope this makes sense.
  • Thanks, it makes much more sense now. I'll be definitely check your library out and I hope it'll be moved to the BCL soon just like java  decided to integrate concurrent package in their base libs under java.util.concurrent. If you are aware of it, can you comment of differences between your aproach and theirs?
    I must say that the two demos that have been shown on C9 were quite a thrill but my objection is that the model you've managed to pull off is so radical that I'm having problems digesting it. When I look a demo on java.util.concurrent it makes sense as the higher level concepts are the same tackled for years, while CCR tackles stuff in a seemingly different way so I can't do a apples to apples comparison (that said, it's also the most interesting way of using iterators I've seen yet, .net 2.0 rocks).
    Can you for instance do some demo code where you present traditional concept like the Future pattern or some comparison between java.util.concurrent and CCR (two colums, one java solution, other CCR).
    Also they introduced some new collection types with finer lock granularity, will CCR or BCL try to tackle that as well in the future?
  • William Staceystaceyw Before C# there was darkness...
    georgioc wrote:

           
               Activate(Arbiter.Interleave(
                    new TeardownReceiverGroup(
                        Arbiter.Receive<Shutdown>
                       (_mainPort,false,ShutdownHandler)),
                    new ConcurrentReceiverGroup(
                       Arbiter.Receive<ReadPacket>
                       (true,mainPort,ReadHandler),
                       Arbiter.Receive<WritePacket>
                       (true,mainPort,WriteHandler))
                    new ExclusiveReceiverGroup()
               ));

         }

          // make this an iterator if you want to do multiple async reads
          // in sequence
         private void ReadHandler(ReadPacket packet)
         {
               // put your code that does the socket i/o here
         }
    }



    Thanks George.  I like that idea.  Making a socket just a Port abstraction is cool. Working on it now.  Couple questions:
    1) I don't fully follow the Interleave arbiter above and why it was picked over a Choice arbiter.

    2) I wonder if overlapped reads and writes will be a concern. I assume it will still be possible to get two reads quickly on a port and kick off two async reads and then they get mixed up data?  Maybe writes with same issue.  Not sure.

    3) What happens if you get a 2nd Shutdown message on a port.  Does it just sit in the port?  Can you close a port so the caller will get exception when they try to Post?

    4) With timers.  Say you get a timer event because of timeout.  So you close the socket, etc.  But at the ~same time, the read actually completed, but the timer beat it to the Q.  What happens to that Read post?  What happens to closing a socket in the process of a read?  Where will that exception even be seen on the port?

    Thanks again.
    --
    wjs
  • schrepfler wrote:
    Thanks, it makes much more sense now. I'll be definitely check your library out and I hope it'll be moved to the BCL soon just like java  decided to integrate concurrent package in their base libs under java.util.concurrent. If you are aware of it, can you comment of differences between your aproach and theirs?
    I must say that the two demos that have been shown on C9 were quite a thrill but my objection is that the model you've managed to pull off is so radical that I'm having problems digesting it. When I look a demo on java.util.concurrent it makes sense as the higher level concepts are the same tackled for years, while CCR tackles stuff in a seemingly different way so I can't do a apples to apples comparison (that said, it's also the most interesting way of using iterators I've seen yet, .net 2.0 rocks).
    Can you for instance do some demo code where you present traditional concept like the Future pattern or some comparison between java.util.concurrent and CCR (two colums, one java solution, other CCR).
    Also they introduced some new collection types with finer lock granularity, will CCR or BCL try to tackle that as well in the future?


    I am sure Microsoft will one day include more advanced or at least easier to use primitives in the CLR to deal with concurency and coordination. There are several efforts under way. The CCR is by no means the chosen or official solution. It was just something we think solves real problems and matches our loosely coupled environment well.

    I am familiar with the new additions to jave regarding concurrency. In my opinion, there is probably too many options available, too many constructs, even if as a whole they are very well done and very powerfull.

    Note that the CCR can express all features i am aware of (futures, various queue configurations, various dispatching patterns etc) and with our use of iterators do non blocking i/o it in a very user friendly fashion. Also the CCR allows you to compose coordination primitives, since it uses a two phase architecture for all message dispatching. The interleave primitve for example relies on this and re-uses simpler primitives. Join does do. We really have one model for coupling queues with code, predicated by some constraints.


    This is really what makes it robust and has allowed us to keep the runtime pretty much the same, but expand its abilities without redoing things or having a sea of synchronization primitives. Alot of it stems also by our decision to keep the Ports in the CCR simple and decouple dispatching from the arbiters and ports.

    This is all just my opinion, i am just glad i was able to ship a library that has proved very useful, very stable and very resilient to new demands for features. The robotics application domain is a perfect fit to showcase this (plus our state based simple distributed model) but people see its applications in many other areas as well.

  • staceyw wrote:
    
    georgioc wrote:
           
               Activate(Arbiter.Interleave(
                    new TeardownReceiverGroup(
                        Arbiter.Receive<Shutdown>
                       (_mainPort,false,ShutdownHandler)),
                    new ConcurrentReceiverGroup(
                       Arbiter.Receive<ReadPacket>
                       (true,mainPort,ReadHandler),
                       Arbiter.Receive<WritePacket>
                       (true,mainPort,WriteHandler))
                    new ExclusiveReceiverGroup()
               ));

         }

          // make this an iterator if you want to do multiple async reads
          // in sequence
         private void ReadHandler(ReadPacket packet)
         {
               // put your code that does the socket i/o here
         }
    }



    Thanks George.  I like that idea.  Making a socket just a Port abstraction is cool. Working on it now.  Couple questions:
    1) I don't fully follow the Interleave arbiter above and why it was picked over a Choice arbiter.

    2) I wonder if overlapped reads and writes will be a concern. I assume it will still be possible to get two reads quickly on a port and kick off two async reads and then they get mixed up data?  Maybe writes with same issue.  Not sure.

    3) What happens if you get a 2nd Shutdown message on a port.  Does it just sit in the port?  Can you close a port so the caller will get exception when they try to Post?

    4) With timers.  Say you get a timer event because of timeout.  So you close the socket, etc.  But at the ~same time, the read actually completed, but the timer beat it to the Q.  What happens to that Read post?  What happens to closing a socket in the process of a read?  Where will that exception even be seen on the port?

    Thanks again.
    --
    wjs


    1) Choice is used for single activations, not something that will persist. Choice is more like a try /catch around a function call: YOu issue a message, and then use Choice to pick which code to run based on some response. But the response is only sent once (and if its not choice guarantees that either branch fires only once).

    Interleave on the other hand is most often used for Receives that are persistent, aka they keep scheduling handlers as messages arrive. It coordinates between handlers.

    To be clear i am not saying you MUST use interleave. In the example we are discussing, it probably only buys you one thing: When the shutdown message is sent it will do the following:
    1) it will wait until all concurrent/exclusive handlers are done
    2) it will shutdown atomically the entire interleave so no more messages are processed (including other shutdown messages)
    3) it will then execute the shutdown handler (you can have multiple btw, using different msg types, we just execute the first one) guaranteeing that cleanup happens when everything else is done!! This is just one line in the CCR, doing this level of guarantees with threads, is *hard*


    2) Absolutely!!!!! Overlapped reads and writes is what we live for! As they complete, you post a response on the read/write packet you got, independently of all other requests.  If you want to use
    interleave to protect you while requests are pending, you can use an iterator and yield until the request is done. But this a bit of an advanced subject, maye i ll get Jeff to write another article on iterators and interleave, etc Smiley

    3) It will sit on the port, when the shutdown is complete and the port goes out of scope, everything gets GCed. It works nicely, you dont need to worry about these message sunless you need to post responses. If you want to post responses, use the Port.Test() method to flush out the queue and respond to all messages pending when you your shutdown was complete

    4) If your read post completed after a timer fired and you already completed the Read request with some timeout error, then you just throw away the read result. The application on top will decide if it needs to re-issue, abort etc. THis is a general network programming policy decision. The CCR, if you used choice to listen for the response to the Read, will protect you against two messages arriving at the same time It will non-deterministcally choose only one, and discard the other.

    This means that the original read issued by the application, will be concisdered complete when the time out first, andany addition read response, exceptions etc, will get posted on the response port for the Read message, but will be ignored and GCed when the Read request goes out of scope.


    hope this makes sense

  • William Staceystaceyw Before C# there was darkness...
    georgioc wrote:
    
    staceyw wrote: 
    georgioc wrote:
           
               Activate(Arbiter.Interleave(
                    new TeardownReceiverGroup(
                        Arbiter.Receive<Shutdown>
                       (_mainPort,false,ShutdownHandler)),
                    new ConcurrentReceiverGroup(
                       Arbiter.Receive<ReadPacket>
                       (true,mainPort,ReadHandler),
                       Arbiter.Receive<WritePacket>
                       (true,mainPort,WriteHandler))
                    new ExclusiveReceiverGroup()
               ));

         }

          // make this an iterator if you want to do multiple async reads
          // in sequence
         private void ReadHandler(ReadPacket packet)
         {
               // put your code that does the socket i/o here
         }
    }



    Thanks George.  I like that idea.  Making a socket just a Port abstraction is cool. Working on it now.  Couple questions:
    1) I don't fully follow the Interleave arbiter above and why it was picked over a Choice arbiter.

    2) I wonder if overlapped reads and writes will be a concern. I assume it will still be possible to get two reads quickly on a port and kick off two async reads and then they get mixed up data?  Maybe writes with same issue.  Not sure.

    3) What happens if you get a 2nd Shutdown message on a port.  Does it just sit in the port?  Can you close a port so the caller will get exception when they try to Post?

    4) With timers.  Say you get a timer event because of timeout.  So you close the socket, etc.  But at the ~same time, the read actually completed, but the timer beat it to the Q.  What happens to that Read post?  What happens to closing a socket in the process of a read?  Where will that exception even be seen on the port?

    Thanks again.
    --
    wjs

    2) Absolutely!!!!! Overlapped reads and writes is what we live for! As they complete, you post a response on the read/write packet you got, independently of all other requests.  If you want to use
    interleave to protect you while requests are pending, you can use an iterator and yield until the request is done. But this a bit of an advanced subject, maye i ll get Jeff to write another article on iterators and interleave, etc

    ...


    Thanks George.  Maybe the answer is in that, but still having a little confusion.  Say a Read gets posted, so I start an async read.  So the first async reads maybe 5 bytes and not the whole message yet.  However, another Read gets posted and that async read loop starts as well.  The second read starts reading the packet that should only be read by the first reader.  So data gets gumbled - error.  What I think I need is a way to only pop the second read only *after the first read has completed fully and has posted a reply.  You could do this via calling convension and say don't post another read until you get a reply, but then it is only convention and the service should really protect against this.  I could probably work with a flag or something, but I was hoping for a better idea using the CCR.  Does this make sense?
  • staceyw wrote:
    

    Thanks George.  Maybe the answer is in that, but still having a little confusion.  Say a Read gets posted, so I start an async read.  So the first async reads maybe 5 bytes and not the whole message yet.  However, another Read gets posted and that async read loop starts as well.  The second read starts reading the packet that should only be read by the first reader.  So data gets gumbled - error.  What I think I need is a way to only pop the second read only *after the first read has completed fully and has posted a reply.  You could do this via calling convension and say don't post another read until you get a reply, but then it is only convention and the service should really protect against this.  I could probably work with a flag or something, but I was hoping for a better idea using the CCR.  Does this make sense?


    Ok i get what you are talking about now. I wish i could just post our TcpScheduler code, but i am not allowed yet Wink

    To have the CCR enforce this level of atomicity, you can put your Arbiter.Receive<ReadRequest>(true,_mainPort,ReadHandler) in your ExclusiveReceiverGroup() in the Interleave.

    Then you must also make your handler an iterator and do the following:


    IEnumerator<ITask> ReadHandler(ReadRequest read)

    {
          byte [] readBuffer = new byte[read.ReadSize];

          Port<int,Exception> readResult = new Port<int,Exception>();

          int bytesRead = 0;
          while (bytesRead < read.ReadSize)
          {
              // issue async read for N bytes. In the method below
              // use BeginRead, etc and post to the private port when you
             // read bytes, even less than total. 
             DoAsyncRead(readResult, bytesRead, readBuffer);
     
             yield return Arbiter.Choice(readResult,
                 delegate (int bytes) { bytesRead += bytes;},
                 delegate (Exception ex) 
                 {
                      bytesRead = 0;
                      read.ResponsePort.Post(ex);
                 }
             );

             if (bytesRead == 0)
                 yield break; //error occured, we already posted exception

          } //while loop. Look we can do loops and async requests!!!

          // ok io operation done.
          read.ResponsePort.Post(readBuffer);
         
    }

    In the handler above (please excuse any bugs, i just coded it inline) you have a multi stage iterator, yielding for each tcp read i/o operation. Look how nice it is to just use an iterator, and not block a thread! In any case, if this handler was started within the exclusive group, in the interleave, we will NOT allow any other reads/writes to proceed until this one is done!

    There are other ways to do this in the CCR but only apply for simple cases. Instead of having a persisted receiver, you can just activate your Read Handler, with one time activation, then when you are done with a read or write, Activate it again. Very simple. But i might as well show you the iterator pattern above since its more general and you can synchronize across many different types of requests if its inside the interleave


  • William Staceystaceyw Before C# there was darkness...
    georgioc wrote:
    

    ...
    IEnumerator<ITask> ReadHandler(ReadRequest read)

    {
          byte [] readBuffer = new byte[read.ReadSize];

          Port<int,Exception> readResult = new Port<int,Exception>();

          int bytesRead = 0;
          while (bytesRead < read.ReadSize)
          {
              // issue async read for N bytes. In the method below
              // use BeginRead, etc and post to the private port when you
             // read bytes, even less than total. 
             DoAsyncRead(readResult, bytesRead, readBuffer);
     
             yield return Arbiter.Choice(readResult,
                 delegate (int bytes) { bytesRead += bytes;},
                 delegate (Exception ex) 
                 {
                      bytesRead = 0;
                      read.ResponsePort.Post(ex);
                 }
             );

             if (bytesRead == 0)
                 yield break; //error occured, we already posted exception

          } //while loop. Look we can do loops and async requests!!!

          // ok io operation done.
          read.ResponsePort.Post(readBuffer);
         
    }

    ...


    Wow.  Thanks man.  That is the shiz nit.  I almost get what is happening, but not fully.  Could you, please, walk me through that in terms of the yield loop.  What thread is calling and consuming the ITask, what happens after yield return, etc.  Thanks so much G.  You have been so helpful here.  Cheers!
    --
    wjs
  • staceyw wrote:
    
    georgioc wrote: 

    ...
    IEnumerator<ITask> ReadHandler(ReadRequest read)

    {
          byte [] readBuffer = new byte[read.ReadSize];

          Port<int,Exception> readResult = new Port<int,Exception>();

          int bytesRead = 0;
          while (bytesRead < read.ReadSize)
          {
              // issue async read for N bytes. In the method below
              // use BeginRead, etc and post to the private port when you
             // read bytes, even less than total. 
             DoAsyncRead(readResult, bytesRead, readBuffer);
     
             yield return Arbiter.Choice(readResult,
                 delegate (int bytes) { bytesRead += bytes;},
                 delegate (Exception ex) 
                 {
                      bytesRead = 0;
                      read.ResponsePort.Post(ex);
                 }
             );

             if (bytesRead == 0)
                 yield break; //error occured, we already posted exception

          } //while loop. Look we can do loops and async requests!!!

          // ok io operation done.
          read.ResponsePort.Post(readBuffer);
         
    }

    ...


    Wow.  Thanks man.  That is the shiz nit.  I almost get what is happening, but not fully.  Could you, please, walk me through that in terms of the yield loop.  What thread is calling and consuming the ITask, what happens after yield return, etc.  Thanks so much G.  You have been so helpful here.  Cheers!
    --
    wjs


    Sure, lets do a line by line break down:

    IEnumerator<ITask> ReadHandler(ReadRequest read)
    {

    The handler declaration now returns IEnumerator<ITask> which is required for C# compiler to turn our method into a heap allocated class, and create essentially a little state machine, with stach variables in the method being converted to heap allocations, members of the generate class.

    Every time we yield, the iterator will return control to the CCR scheduler, which will schedule the ITask instance you yielded to (lets say a choice over two pending receives) and when that task is done it will call MoveNext() on the iterator, which will execute the line after the yield!


          byte [] readBuffer = new byte[read.ReadSize];
          Port<int,Exception> readResult = new Port<int,Exception>();

    In the lines above i preallocate a buffer to put the entire TCP packet once we read it. I also create a little private port, that i will use to determine when my lower level BeginRead is complete, for each cunk of bytes we read from the socket.


          int bytesRead = 0;
          while (bytesRead < read.ReadSize)
          {
              // issue async read for N bytes. In the method below
              // use BeginRead, etc and post to the private port when you
             // read bytes, even less than total. 
             DoAsyncRead(readResult, bytesRead, readBuffer);
     
    Ok now i start a little "asynchronous" loop, where i call a helper routine containin pretty much the code you posted, that issues BeginReads to the socket. I pass it the current number of bytes read, so it can index the readBuffer and copy data into that offset.

             yield return Arbiter.Choice(readResult,
                 delegate (int bytes) { bytesRead += bytes;},
                 delegate (Exception ex) 
                 {
                      bytesRead = 0;
                      read.ResponsePort.Post(ex);
                 }
             );

    Ok now the really cool part. Since the underlying read is async, and we dont want to block a thread here, but still use a nice little while construct, we instead *yield* waiting for a response on the private readResult port. When the read completes, we increment bytesRead (notice how the stack variable can be used directly) or we post directly the exception on the original request. When the read occurs, the while loop continues, just like if were blocking a thread for the i/o operation!

             if (bytesRead == 0)
                 yield break; //error occured, we already posted exception

    Ok after the yield you should always check which branch of the choice fired, and exit if error occured. Yield break will terminate the iteration and release the "atomic" constraint the interleave is enforing around your handler, allowing new ReadHandlers to execute.

          } //while loop. Look we can do loops and async requests!!!

          // ok io operation done.
          read.ResponsePort.Post(readBuffer);
         
    We finally post the finall result, after our loop is done. No spaghetti callback code, and we were protected atomically.


    }


    An iterator scheduler definatelly messes with people's heads, but i swear, its extremely reliable, scalable, and it uses iterators exactly for what they where meant for. The implemention of the iterator scheduling in the CCR is about 30 lines of code...

  • Would it be difficult to create a sequence diagram for this flow (for those of us that think visually)?
  • schrepfler wrote:
    Would it be difficult to create a sequence diagram for this flow (for those of us that think visually)?


    the lines of code are few, but unfamiliar so i will repeat the original request stanley had (which is a bit of an advanced topic):

    "Create an asynchronous packet processor reading from a stream based transport, where a single read is not guaranteed to return all the bytes for one packet"

    "Make a read atomic, until its done reading an entire packet. Prevent other reads from accessing the transport (and possibly other writes)"

    I cant easily attach a block diagram but the 10 lines of CCR code in the post above sow how you can use the familiar synchronous read style to read repeatedly from the transport, until a packets worth of bytes has been read.

    At the same time, CCR protects you, without the need for explicit locking or thread blocking, from other reads.
  • William Staceystaceyw Before C# there was darkness...
    georgioc wrote:
    
    schrepfler wrote: Would it be difficult to create a sequence diagram for this flow (for those of us that think visually)?


    the lines of code are few, but unfamiliar so i will repeat the original request stanley had (which is a bit of an advanced topic):

    "Create an asynchronous packet processor reading from a stream based transport, where a single read is not guaranteed to return all the bytes for one packet"

    "Make a read atomic, until its done reading an entire packet. Prevent other reads from accessing the transport (and possibly other writes)"

    I cant easily attach a block diagram but the 10 lines of CCR code in the post above sow how you can use the familiar synchronous read style to read repeatedly from the transport, until a packets worth of bytes has been read.

    At the same time, CCR protects you, without the need for explicit locking or thread blocking, from other reads.


    That would be a great add to C9 - allow attachments for PPT, gif, *.cs, projects, etc.   Anyway, related question...

    Is it possible to have atomic reads as a 1 set and atomic writes as 1 set.  So a read and write could process in parallel, but only 1 read at a time and 1 write at a time.  I have read somewhere that it is safe to process a read on one thread and a write on another.  Thanks again!
    -- wjs
  • staceyw wrote:
    

    That would be a great add to C9 - allow attachments for PPT, gif, *.cs, projects, etc.   Anyway, related question...

    Is it possible to have atomic reads as a 1 set and atomic writes as 1 set.  So a read and write could process in parallel, but only 1 read at a time and 1 write at a time.  I have read somewhere that it is safe to process a read on one thread and a write on another.  Thanks again!
    -- wjs


    yes, absolutely. Use two interleaves decoupled from each other. Put the ReadHandler in an ExclusiveReceiverGroup in one interleave and the WriteHandler also in an exclusive group in another interleave.

    activate them independently but still use the same shutdown port between them. From each shutdown handler post a shutdown message so the second interleave tears down as well (although only guaranteed atomic with its exclusive group)

    another way to do this ofcourse is with the one time activation of the read/write handler and then re-activating the receiver when you are done with the read/write.

    g

  • William Staceystaceyw Before C# there was darkness...
    georgioc wrote:
    
    staceyw wrote: 

    That would be a great add to C9 - allow attachments for PPT, gif, *.cs, projects, etc.   Anyway, related question...

    Is it possible to have atomic reads as a 1 set and atomic writes as 1 set.  So a read and write could process in parallel, but only 1 read at a time and 1 write at a time.  I have read somewhere that it is safe to process a read on one thread and a write on another.  Thanks again!
    -- wjs


    yes, absolutely. Use two interleaves decoupled from each other. Put the ReadHandler in an ExclusiveReceiverGroup in one interleave and the WriteHandler also in an exclusive group in another interleave.

    activate them independently but still use the same shutdown port between them. From each shutdown handler post a shutdown message so the second interleave tears down as well (although only guaranteed atomic with its exclusive group)

    another way to do this ofcourse is with the one time activation of the read/write handler and then re-activating the receiver when you are done with the read/write.

    g


    Thanks g!  I wonder if it would make sense (eventually) to extend your CCR Interleave code to allow multiple ExclusiveReceiverGroups.  So I could do something like this:

                Arbiter.Activate(dq,
                    Arbiter.Interleave
                    (
                         new TeardownReceiverGroup
                         (
                            Arbiter.Receive<Shutdown>(false, replyPort, ShutdownHandler)
                         ),
                         new ExclusiveReceiverGroup
                         (
                            Arbiter.Receive<ReadPacket>(true, replyPort, ReadHandler),
                         ),
                         new ExclusiveReceiverGroup
                         (
                            Arbiter.Receive<WritePacket>(true, replyPort, WriteHandler),
                         )
                         new ConcurrentReceiverGroup
                         (
                            Arbiter.Receive<Timeout>(true, replyPort, TimeoutHandler),
                            Arbiter.Receive<Error>(true, replyPort, ErrorHandler),
                         )
                     )
                );

    Not sure if this would be possible or not, but would seem to be handy for things like this while needing only one Arbiter active.

    Also, I am wondering if it is better to have one reply port per socket (in this case) or have 1 reply port that handles all sockets, then just figure out what socket replied based on some data in the reply object.  Just wonder the perf and overhead between the two methods.  Thanks again.  I am starting to feel the Force with the CCR  Smiley
    --wjs
  • staceyw wrote:
    

    Thanks g!  I wonder if it would make sense (eventually) to extend your CCR Interleave code to allow multiple ExclusiveReceiverGroups.  So I could do something like this:

                Arbiter.Activate(dq,
                    Arbiter.Interleave
                    (
                         new TeardownReceiverGroup
                         (
                            Arbiter.Receive<Shutdown>(false, replyPort, ShutdownHandler)
                         ),
                         new ExclusiveReceiverGroup
                         (
                            Arbiter.Receive<ReadPacket>(true, replyPort, ReadHandler),
                         ),
                         new ExclusiveReceiverGroup
                         (
                            Arbiter.Receive<WritePacket>(true, replyPort, WriteHandler),
                         )
                         new ConcurrentReceiverGroup
                         (
                            Arbiter.Receive<Timeout>(true, replyPort, TimeoutHandler),
                            Arbiter.Receive<Error>(true, replyPort, ErrorHandler),
                         )
                     )
                );

    Not sure if this would be possible or not, but would seem to be handy for things like this while needing only one Arbiter active.

    Also, I am wondering if it is better to have one reply port per socket (in this case) or have 1 reply port that handles all sockets, then just figure out what socket replied based on some data in the reply object.  Just wonder the perf and overhead between the two methods.  Thanks again.  I am starting to feel the Force with the CCR  Smiley
    --wjs



    I would recommend one port per socket, and further more a unique port per read/write packet. Let the CCR do the dispatching for you, no need for double dispatch. The ports are fairly lightweight so they should not be a concern in terms of cost, all other things considered.
    We do this in our runtime and achieve good message throughputs and stable, small, working set


  • William Staceystaceyw Before C# there was darkness...
    georgioc wrote:
    
    staceyw wrote: 

    Thanks g!  I wonder if it would make sense (eventually) to extend your CCR Interleave code to allow multiple ExclusiveReceiverGroups.  So I could do something like this:

                Arbiter.Activate(dq,
                    Arbiter.Interleave
                    (
                         new TeardownReceiverGroup
                         (
                            Arbiter.Receive<Shutdown>(false, replyPort, ShutdownHandler)
                         ),
                         new ExclusiveReceiverGroup
                         (
                            Arbiter.Receive<ReadPacket>(true, replyPort, ReadHandler),
                         ),
                         new ExclusiveReceiverGroup
                         (
                            Arbiter.Receive<WritePacket>(true, replyPort, WriteHandler),
                         )
                         new ConcurrentReceiverGroup
                         (
                            Arbiter.Receive<Timeout>(true, replyPort, TimeoutHandler),
                            Arbiter.Receive<Error>(true, replyPort, ErrorHandler),
                         )
                     )
                );

    Not sure if this would be possible or not, but would seem to be handy for things like this while needing only one Arbiter active.

    Also, I am wondering if it is better to have one reply port per socket (in this case) or have 1 reply port that handles all sockets, then just figure out what socket replied based on some data in the reply object.  Just wonder the perf and overhead between the two methods.  Thanks again.  I am starting to feel the Force with the CCR  Smiley
    --wjs



    I would recommend one port per socket, and further more a unique port per read/write packet. Let the CCR do the dispatching for you, no need for double dispatch. The ports are fairly lightweight so they should not be a concern in terms of cost, all other things considered.
    We do this in our runtime and achieve good message throughputs and stable, small, working set



    Sorry for another question, but that just confused me again.  If I create a new port for each read, for example, then does that mean I need to Activate a new Arbiter for each new port as well?  If so, your saying creating a new Arbiter is not expensive either?  Also, does this mean I will not be able to use an Interleave arbiter to gain the protection of atomic reads (i.e. only 1 read at a time will pop)?  Thanks again George.
  • staceyw wrote:
    

    Sorry for another question, but that just confused me again.  If I create a new port for each read, for example, then does that mean I need to Activate a new Arbiter for each new port as well?  If so, your saying creating a new Arbiter is not expensive either?  Also, does this mean I will not be able to use an Interleave arbiter to gain the protection of atomic reads (i.e. only 1 read at a time will pop)?  Thanks again George.


    Sorry, i should have kept my mouth shut Smiley

    I also recommend you read our service tutorials, part of the Robotics download, plus the samples. They have *lots* of neat CCR examples!


    Ok what i meant was the original pattern in an earlier post. I was suggesting you create a mini "server", listening behind a port for socket read/write request. On that main port you did the interleave, with persistent receivers.

    The response/result port, which is part of every read/write message will have a Arbiter.Choice activated (or using yield return) from the "client", the code that issued the read.

    So
    1) The server, with long running receivers, usually uses interleave to coordinate its handlers as requests come in
    2) the client, creates a Read/Write request with a new Response port in it, and then listens for a reply on that port, using Choice. No need for interleave.

    Here is the client side:


    // create socket service on port 808
    SocketServicePort socketPort =  SocketService.Create(808);

    // issue a read

    ReadPacket read = new ReadPacket(1000); //1000 bytes size
    socketPort.Post(read);

    // listen, async for response. Use yield return instead of Activate if
    // you wish, inside an iterator

    Activate(
       Arbiter.Choice(read.ResultPort,
          delegate(byte [] data {//success},
          delegate(Exception Ex) {//error})
    );



    The socket service side:

    class SocketServicePort : PortSet<ReadPacket,WritePacket, Shutdown>
    {
    }

    // this is approximately the type definitions
    class ReadPacket
    {
        public PacketSize;
        public PortSet<byte [], Exception> ResultPort =
            new PortSet<byte [],Exception>();
    }


    now the SocketService would look like this


    public class SocketService
    {
       
        private SocketServicePort _mainPort = new SocketServicePort();
        public static SocketServicePort Create(int tcpPort)
        {
              return new SocketService(tcpPort).Init();
        }

         private SocketServicePort Init()
         {

           
               Activate(Arbiter.Interleave(
                    new TeardownReceiverGroup(
                        Arbiter.Receive<Shutdown>
                       (_mainPort,false,ShutdownHandler)),
                    new ExclusiveReceiverGroup(
                       Arbiter.Receive<ReadPacket>
                       (true,mainPort,ReadHandler),
                       Arbiter.Receive<WritePacket>
                       (true,mainPort,WriteHandler))
                    new ConcurrentReceiverGroup()
               ));

         }

          // make this an iterator if you want to do multiple async reads
          // in sequence
         private void ReadHandler(ReadPacket packet)
         {
               // put your code that does the socket i/o here
         }


    }














  • Hi George,

    This library looks fantastic if a little dautning to get your head around.  You must be a very satisfied developer indeed Smiley)

    That darned Ritcher just released his PowerThreading Library with lots of Async goodness that I did'nt quite get how to use and now there's CCR!!

    Anyway, I kind of get it but I just wanted to check one thing regarding the persitent nature of Queues.  If I post a number of Items to a port and then Activate it on a Receive handler without persitency, am I right in saying that only one item will be processed?  I was thinking that perhaps it would process all items in the port already but just ignore the ones addedd after the Arbiter was started.

    Many thanks, keep the example code coming if you don't mind and full marks to Stacey and crew for picking this stuff up so quick.

    PS.  I'd love to see a full working project of reading an arbitary amount of bytes from an async socket.

    Many thanks
  • George, I love your enthusiasm, and i think you are really onto something with the CCR. Here is my question though: How do we interact with databases, which are inherently blackbox affairs? Consider this scenario

    Function does some calculation, adds a result to the database using a transaction based stored procedure. If we leave the stored procedure working away, and then later on we encounter a rollback, how do we respond to that rollback?

    It would seem to me that we have progressed too far in our application. Either we wait to be sure, or we risk being too good at the CCR stage that we force a transaction rollback/Data inconsistency Database Trigger. ie we run 30 threads against the DB, threads 1 and 27 cause a rollback, or are not consistent.

    This is just my pet hate at the moment, I daily work with an app that does the following:

    func addtoDB(params){ //pseudo code
     addToDB //do SQL one assumes
    displayStatusDialogue
    }

    Now to do multiple dimensional queries, they call the method 30 times, resulting in a dirty 30 dialogues flashing. Admittedly this is rubbish engineering, but it was what got me thinking about the problems of repeatedly hitting the database. Inherently this function assumes success, which is why it frequently crashes [I didn't write it, i just have to use it!]

    the idea of ATOMIC database calls seems to be great but closes out the ability to interact with the CCR? Ie the CCR is not allowed to jump in and help the database.

    Am I wrong here? I am making wild generalisations about databases, and blind calls to functions, but i see a problem.

    It would be interesting to get your feedback, it is great that you are engaging with channel9!

    Drex
  • tartufella wrote:
    Hi George,

    This library looks fantastic if a little dautning to get your head around.  You must be a very satisfied developer indeed )

    That darned Ritcher just released his PowerThreading Library with lots of Async goodness that I did'nt quite get how to use and now there's CCR!!

    Anyway, I kind of get it but I just wanted to check one thing regarding the persitent nature of Queues.  If I post a number of Items to a port and then Activate it on a Receive handler without persitency, am I right in saying that only one item will be processed?  I was thinking that perhaps it would process all items in the port already but just ignore the ones addedd after the Arbiter was started.

    Many thanks, keep the example code coming if you don't mind and full marks to Stacey and crew for picking this stuff up so quick.

    PS.  I'd love to see a full working project of reading an arbitary amount of bytes from an async socket.

    Many thanks


    Hi, the CCR will make sure any messages already posted on a port, will be issued to a receiver, when its registred. If the receiver is one time, only one message will be consumed. If the receiver is persistent, thena ll the messages wil be cnsumed and N concurrent instances of the receiver will be started, each one with a unique message fromt he port. basically, we will make sure you "catch up", if you register a receiver after messages already have been waiting. You can use this pattern to implement trivially future-like things, late activations, etc etc
  • Drexthepimp wrote:
    George, I love your enthusiasm, and i think you are really onto something with the CCR. Here is my question though: How do we interact with databases, which are inherently blackbox affairs? Consider this scenario

    Function does some calculation, adds a result to the database using a transaction based stored procedure. If we leave the stored procedure working away, and then later on we encounter a rollback, how do we respond to that rollback?

    It would seem to me that we have progressed too far in our application. Either we wait to be sure, or we risk being too good at the CCR stage that we force a transaction rollback/Data inconsistency Database Trigger. ie we run 30 threads against the DB, threads 1 and 27 cause a rollback, or are not consistent.

    This is just my pet hate at the moment, I daily work with an app that does the following:

    func addtoDB(params){ //pseudo code
     addToDB //do SQL one assumes
    displayStatusDialogue
    }

    Now to do multiple dimensional queries, they call the method 30 times, resulting in a dirty 30 dialogues flashing. Admittedly this is rubbish engineering, but it was what got me thinking about the problems of repeatedly hitting the database. Inherently this function assumes success, which is why it frequently crashes [I didn't write it, i just have to use it!]

    the idea of ATOMIC database calls seems to be great but closes out the ability to interact with the CCR? Ie the CCR is not allowed to jump in and help the database.

    Am I wrong here? I am making wild generalisations about databases, and blind calls to functions, but i see a problem.

    It would be interesting to get your feedback, it is great that you are engaging with channel9!

    Drex


    you are correct about interacting with databases, they do so much, and stored procedures as so damn hard to get right in terms of concurrency that the CCR can only help you if you are very careful, and maybe if then...

    One suggestion is the following. Why dont you use CCR ports to  post messages from your little stored procedure (like post a message saying DisplayDialog), and then have the CCR coordinate between rollbacks, dialog displayes or whatever you want to do. Basically only post messages on CCR ports from your procedures, and then use Choice, Join or Interleave CCR primitives to coordinate across all these database events. Interleave for example can make sure we cancel or abort any pending display or work in progress items, if a rollback occurs. Or it can at least guarantee the right "interleaving".

    So the CCR can help you just have to essentially insert it as a layer between your app and the DB, and the use the CCR port and arbiters to do intelligent coordination between multiple concurrent, asynchrnous DB (and other) events.

  • So use it wisely, at messaging points, possibly prevent then commands flowing to the db that you can predict would cause errors. The ability to close down ports when failures occur is like a firewall to the database I suppose.

    I am playing around with this whole package. It is a very good idea. I suppose i now wonder two questions:

    1) How much of this idea was inspired from you working on complicated problems down in the kernel.

    2) How much significance could this have implemented down in the kernel, or is it effectively already a tackled problem?

    Cheers
    Drex
  • Drexthepimp wrote:
    So use it wisely, at messaging points, possibly prevent then commands flowing to the db that you can predict would cause errors. The ability to close down ports when failures occur is like a firewall to the database I suppose.

    I am playing around with this whole package. It is a very good idea. I suppose i now wonder two questions:

    1) How much of this idea was inspired from you working on complicated problems down in the kernel.

    2) How much significance could this have implemented down in the kernel, or is it effectively already a tackled problem?

    Cheers
    Drex


    the CCR evolved when we tried to build the distributed, service oriented runtime the robotics product now uses. Everything was a black box in that case (just like the db in your example ) and everything was loosely coupled with potential for long latencies and lots of failures. So the CCR addresses this type of environment by making coordination and concurrency easier, and by allowing you to consicely deal with all the permutations of failure/completions when issuing multiple requests.

    Now i have been in the kernel for about 6 out of my 10 years in  my MS and the kernel (and user mode win32) has similar primitives to a subset of what the CCR offsets. The CCR however really makes a difference IMHO because of its target runtime: CLR 2.0 with advanced features like iterators and anonymous methods.

    The CCR right now does well performance wise but its primarily held back from the following things:

    1) thread switch time. again we scale to millions of pending tiems using iterators, but still, every time we load balance we take the user-kernel-user hit of thread switches. We did try fiber hosting everything, but not worth it (long story)
    2) we grab a lock underneath for task queuing. I go to great lengths to not grab locks or even pulse monitors but i can take it a step further (i have in private bits) and use lock free stuff for certain port/receiver configs

    so short of swithing the kernel from threads to a different task/work item model, i think the CCR is fine where it is Smiley

  • William Staceystaceyw Before C# there was darkness...
    Hi George.  Is there an example of using InterleaveReceiverGroup?
  • mmlmml
    regarding interleaving...

    Hypothetically, why does the CCR have to wait for say your 1million messages to be coordinated before being able to service for example this 1 other important message you want to process (which is not part of the coordination of the 1million messages but ends up having to wait until those 1million messages get coordinated; or does it wait?)?

    I might have missed this in the interview but does the CCR allow messages to get thrown into the mix before the 1million messages get coordinated that happen to not be part of that coordinated set so to speak. Maybe what I asking is whether or not the scheduling algorithm accomodates this in the interleaving pattern or does the CCR address this in some other way.
    Nice idea by the way, I hope to see more of it. Also, IMHO I wish the threading model would get replaced with something that allowed for better determinacy.

  • Hi George,

    I think I'm getting the whole CCR thing bit by bit.

    What would nail it for me is if I could do the following.

    I have a logging application which must send emails either when a certain number have amounted - or a certain time period occurs.  So if I have 5 log entries OR 5 minutes occur, then I need to send everything in the port.

    This leads obviously to a Choice Arbiter with a Timer and a MultipleItemReceive.  However, since a choice cannot be persisted, how could I keep this chain of events going after either event has occured for the first time?

    thanks

    Marshall
  • staceyw wrote:
    Hi George.  Is there an example of using InterleaveReceiverGroup?


    Sorry for the slow response, i am supposed to be on vacation Big Smile

    The only valid groups, that you can pass to the interleave arbiter are:
    1) concurrentRecieverGroup
    2)ExclusiveReceiverGroup
    3)TeardownReceiverGroup
  • tartufella wrote:
    Hi George,

    I think I'm getting the whole CCR thing bit by bit.

    What would nail it for me is if I could do the following.

    I have a logging application which must send emails either when a certain number have amounted - or a certain time period occurs.  So if I have 5 log entries OR 5 minutes occur, then I need to send everything in the port.

    This leads obviously to a Choice Arbiter with a Timer and a MultipleItemReceive.  However, since a choice cannot be persisted, how could I keep this chain of events going after either event has occured for the first time?

    thanks

    Marshall


    the easiest way is to just re issue the choice, from either outcome. We shows examples of this in some of our robotics services. The neat thing about the CCR is that every arbiter instance, including Choice, can be generated or stored and then re-used. For choice, you have to generate each time, but an accessor on a class makes it easy:


    public class MyService : CcrServiceBase
    {

        Choice MyTimeoutOrFiveEmailsChoice
        {
             get
            {
                return Arbiter.Choice(
                    Arbiter.MultipleItemReceive(emailPort,5,emailHandler),
                    TimeoutPort(5000,TimeoutHandler)
                );
             }
        }
     
        void Init()
        {
             // activate first time
             Arbiter.Activate(taskQueue,MyTimeoutOrFiveEmailsChoice);
        }

        void EmailHandler(Email [] emails)
        {
             // do work here
             .....
             // re-issue choice
            Arbiter.Activate(taskQueue,MyTimeoutOrFiveEmailsChoice);
         }
            
    }

    there are other alternatives, using interleave with the timer being in the teardown receiver group, and then the MultipleItemReciever being in the Concurrent or Exclusive group, and then only re-issuing the interleave of the timeout fires. hat would work as well






    }



  • mml wrote:
    regarding interleaving...

    Hypothetically, why does the CCR have to wait for say your 1million messages to be coordinated before being able to service for example this 1 other important message you want to process (which is not part of the coordination of the 1million messages but ends up having to wait until those 1million messages get coordinated; or does it wait?)?

    I might have missed this in the interview but does the CCR allow messages to get thrown into the mix before the 1million messages get coordinated that happen to not be part of that coordinated set so to speak. Maybe what I asking is whether or not the scheduling algorithm accomodates this in the interleaving pattern or does the CCR address this in some other way.
    Nice idea by the way, I hope to see more of it. Also, IMHO I wish the threading model would get replaced with something that allowed for better determinacy.



    Hi, the CCR doesnt have to wait Big Smile And it doesnt. I actually round robin between dispatcher queues associated with the same dispatcher and i will process a message from each queue. So even if you have 1 million messages coming from one port, associate it with its own dispatcher queue, and then associate your other, critical port that receives only one message once in a while, with a different dispatcher queue!!!

    This was a critical feature for real time, priority and alot of other applications, so the CCR was designed from the beginning to have a scheduling systems, that makes this work.

    Jeff's examples show how to create dispatcher queues (one line) and then when you call Arbiter.Activate, pass the queue you want, with the appropriate receive operation.
  • Very Cool. What is the possiblity of getting the concurrent "build demo" sources? I know that I could definitely use that in my team.
  • Hey Georgio,

    Perhaps you'll call it abuse of the CCR 8-), but I wanted to use it as an asynchronous implementation of the observer pattern, where different interested observers can register themselves, which all get notified when something happens. So my idea was to register different TaskReceiver objects (as observers) to a Port where items are posted such that all TaskReceiver objects are executed (as concurrent as possible). The actual implementation of the CCR is that only one of them is.

    A nice thing is that possible observers only would get notified if there are new items at multiple ports.

    Perhaps an idea for future versions of the CCR: let the user choose if only one TaskReceiver gets the item (as implemented today) or all (unless of course the predicate indicates that the item is of no interest to the specific TaskReceiver object).

    I don't know if you ever looked at the CCR as a way to implement an asynchronous version of the observer pattern and perhaps the impact of what I propose is too big.

    Best regards,
    Frans.

  • dukess wrote:
    Very Cool. What is the possiblity of getting the concurrent "build demo" sources? I know that I could definitely use that in my team.


    we currently maintain that tool as part of our robotics build but we are considering making it available in the channel9 sandbox. If you are MSFT internal we can make this available to you.
  • fransv wrote:
    

    Hey Georgio,

    Perhaps you'll call it abuse of the CCR , but I wanted to use it as an asynchronous implementation of the observer pattern, where different interested observers can register themselves, which all get notified when something happens. So my idea was to register different TaskReceiver objects (as observers) to a Port where items are posted such that all TaskReceiver objects are executed (as concurrent as possible). The actual implementation of the CCR is that only one of them is.

    A nice thing is that possible observers only would get notified if there are new items at multiple ports.

    Perhaps an idea for future versions of the CCR: let the user choose if only one TaskReceiver gets the item (as implemented today) or all (unless of course the predicate indicates that the item is of no interest to the specific TaskReceiver object).

    I don't know if you ever looked at the CCR as a way to implement an asynchronous version of the observer pattern and perhaps the impact of what I propose is too big.

    Best regards,
    Frans.



    Hi Frans indeed we have implemented the observer pattern (what we call pub/sub or publication/subscription) and we really use it alot. I decided that it made sense as a service, using the CCR, where it keeps a list of ports, and then posts the message N items (one for each subscriber). we did this as a service since then you can observe its state, use it across nodes/machines and keep the CCR simple.

    Please take a look at service tutorial 4,5,6 which show this pattern and how to write services that re-use our subscription manager.

    In general the CCR should remain simple, with few primitives. Processes build on the CCR can then do alot of interesting things. The pub/sub pattern, IMHO, is a nice little service on top.

     if a service is overkill in your scenario, you can write a simple class that does this (keeps a list of ports and replicates messages on them). Or you can write a simple arbiter (derive from Receiver) that does this.
  • William Staceystaceyw Before C# there was darkness...
    fransv wrote:
    

    Hey Georgio,

    Perhaps you'll call it abuse of the CCR , but I wanted to use it as an asynchronous implementation of the observer pattern, where different interested observers can register themselves, which all get notified when something happens. So my idea was to register different TaskReceiver objects (as observers) to a Port where items are posted such that all TaskReceiver objects are executed (as concurrent as possible). The actual implementation of the CCR is that only one of them is.

    A nice thing is that possible observers only would get notified if there are new items at multiple ports.

    Perhaps an idea for future versions of the CCR: let the user choose if only one TaskReceiver gets the item (as implemented today) or all (unless of course the predicate indicates that the item is of no interest to the specific TaskReceiver object).

    I don't know if you ever looked at the CCR as a way to implement an asynchronous version of the observer pattern and perhaps the impact of what I propose is too big.

    Best regards,
    Frans.


    Here is a class that implements an publish/subscribe model.  This is akin to the Event model.  In fact, if you were so inclined, you could replace all your events in your business logic classes with these kind of CCR events.

        public sealed class CcrEvent<T>
        {
            private List<Port<T>> list = new List<Port<T>>();
           
            public void Subscribe(Port<T> port)
            {
                if ( port == null )
                    throw new ArgumentNullException("port");
                lock ( list )
                {
                    list.Add(port);
                }
            }
            public void Unsubscribe(Port<T> port)
            {
                lock ( list )
                {
                    list.Remove(port);
                }
            }
            public void Clear()
            {
                lock ( list )
                {
                    list.Clear();
                }
            }
            public void Publish(T value)
            {
                // Publish value to all subscriber ports. If no subscribers, nothing is published.
                // Notice we are not running delegates in the context of this thread, we are just posting messages to ports.
                // The CCR handles the arbiters based on how the Subscriber(s) defines their Arbiter logic.
                lock ( list )
                {
                    foreach ( Port<T> p in list )
                    {
                        p.Post(value);
                    }
                }
            }

            public static void UsageDemo()
            {
                CcrEvent<int> myEvent = new CcrEvent<int>();
                Port<int> p1 = new Port<int>();
                Port<int> p2 = new Port<int>();
                Port<int> p3 = new Port<int>();
                myEvent.Subscribe(p1);
                myEvent.Subscribe(p2);
                myEvent.Subscribe(p3);
               
                int sleepTime = 100;

                Arbiter.Activate(new DispatcherQueue("q"),
                    Arbiter.Receive(true, p1,
                    delegate(int i)
                    {
                        Thread.Sleep(sleepTime);
                        Console.WriteLine("TID:{0} p1:{1}", Thread.CurrentThread.ManagedThreadId, i);
                    }),
                    Arbiter.Receive(true, p2,
                    delegate(int i)
                    {
                        Thread.Sleep(sleepTime);
                        Console.WriteLine("TID:{0} p2:{1}", Thread.CurrentThread.ManagedThreadId, i);
                    }),
                    Arbiter.Receive(true, p3,
                        delegate(int i)
                        {
                            Thread.Sleep(sleepTime);
                            Console.WriteLine("TID:{0} p3:{1}", Thread.CurrentThread.ManagedThreadId, i);
                        })
                );

                for ( int i = 0; i < 10; i++ )
                {
                    myEvent.Publish(i);
                }
            }
        }

  • William Staceystaceyw Before C# there was darkness...
    georgioc wrote:
    
    mml wrote: regarding interleaving...

    Hypothetically, why does the CCR have to wait for say your 1million messages to be coordinated before being able to service for example this 1 other important message you want to process (which is not part of the coordination of the 1million messages but ends up having to wait until those 1million messages get coordinated; or does it wait?)?

    I might have missed this in the interview but does the CCR allow messages to get thrown into the mix before the 1million messages get coordinated that happen to not be part of that coordinated set so to speak. Maybe what I asking is whether or not the scheduling algorithm accomodates this in the interleaving pattern or does the CCR address this in some other way.
    Nice idea by the way, I hope to see more of it. Also, IMHO I wish the threading model would get replaced with something that allowed for better determinacy.



    Hi, the CCR doesnt have to wait And it doesnt. I actually round robin between dispatcher queues associated with the same dispatcher and i will process a message from each queue. So even if you have 1 million messages coming from one port, associate it with its own dispatcher queue, and then associate your other, critical port that receives only one message once in a while, with a different dispatcher queue!!!

    This was a critical feature for real time, priority and alot of other applications, so the CCR was designed from the beginning to have a scheduling systems, that makes this work.

    Jeff's examples show how to create dispatcher queues (one line) and then when you call Arbiter.Activate, pass the queue you want, with the appropriate receive operation.


    I have a question in regards to this.  As the code below shows, Port processing does not seem to be fair.  The first port posting a receiver seems to win the attention of the DQ until it is drained.  Is this a "fair" model? tia

            public static void PortTest()
            {
                Port<int> p1 = new Port<int>();
                Port<int> p2 = new Port<int>();
                for ( int i=0; i<10; i++ )
                {
                    p1.Post(i);
                    p2.Post(i);
                }
                Arbiter.Activate(new DispatcherQueue("q"),
                    Arbiter.Receive(true, p1,
                        delegate(int i)
                        {
                            Console.WriteLine("p1:{0}", i);
                        }),
                    Arbiter.Receive(true, p2,
                        delegate(int i)
                        {
                            Console.WriteLine("p2:{0}", i);
                        })
                );
                // Output:
                //p1:0
                //p1:1
                //p1:2
                //p1:3
                //p1:4
                //p1:5
                //p1:6
                //p1:7
                //p1:8
                //p1:9
                //p2:0
                //p2:1
                //p2:2
                //p2:3
                //p2:4
                //p2:5
                //p2:6
                //p2:7
                //p2:8
                //p2:9
            }
  • staceyw wrote:
    
    georgioc wrote: 
    mml wrote: regarding interleaving...

    Hypothetically, why does the CCR have to wait for say your 1million messages to be coordinated before being able to service for example this 1 other important message you want to process (which is not part of the coordination of the 1million messages but ends up having to wait until those 1million messages get coordinated; or does it wait?)?

    I might have missed this in the interview but does the CCR allow messages to get thrown into the mix before the 1million messages get coordinated that happen to not be part of that coordinated set so to speak. Maybe what I asking is whether or not the scheduling algorithm accomodates this in the interleaving pattern or does the CCR address this in some other way.
    Nice idea by the way, I hope to see more of it. Also, IMHO I wish the threading model would get replaced with something that allowed for better determinacy.



    Hi, the CCR doesnt have to wait And it doesnt. I actually round robin between dispatcher queues associated with the same dispatcher and i will process a message from each queue. So even if you have 1 million messages coming from one port, associate it with its own dispatcher queue, and then associate your other, critical port that receives only one message once in a while, with a different dispatcher queue!!!

    This was a critical feature for real time, priority and alot of other applications, so the CCR was designed from the beginning to have a scheduling systems, that makes this work.

    Jeff's examples show how to create dispatcher queues (one line) and then when you call Arbiter.Activate, pass the queue you want, with the appropriate receive operation.


    I have a question in regards to this.  As the code below shows, Port processing does not seem to be fair.  The first port posting a receiver seems to win the attention of the DQ until it is drained.  Is this a "fair" model? tia

            public static void PortTest()
            {
                Port<int> p1 = new Port<int>();
                Port<int> p2 = new Port<int>();
                for ( int i=0; i<10; i++ )
                {
                    p1.Post(i);
                    p2.Post(i);
                }
                Arbiter.Activate(new DispatcherQueue("q"),
                    Arbiter.Receive(true, p1,
                        delegate(int i)
                        {
                            Console.WriteLine("p1:{0}", i);
                        }),
                    Arbiter.Receive(true, p2,
                        delegate(int i)
                        {
                            Console.WriteLine("p2:{0}", i);
                        })
                );
                // Output:
                //p1:0
                //p1:1
                //p1:2
                //p1:3
                //p1:4
                //p1:5
                //p1:6
                //p1:7
                //p1:8
                //p1:9
                //p2:0
                //p2:1
                //p2:2
                //p2:3
                //p2:4
                //p2:5
                //p2:6
                //p2:7
                //p2:8
                //p2:9
            }


    There is a couple of reasons why you see this, that is an artifact of this little experiment.

    Notice how you posed ALL messages first, then activated. This means the first time you attach a receiver on port 1, we will have to generate alot of tasks since the messages are all there.

    Now, given that, notice that you used the same queue for both receivers.

    So the combination of both causes this, which is expected. And since no guarantees are given for ordering between ports, this is just one possible interleaving.

    In my post above i mention that we round robin among dispatcher queues. So if you try the same exact experiment, but activate independently the receivers on port 1 and 2, using two different dispatcher queues, you will see interleaving take place. Ofcourse between two different dispatchers, with one queue each, for each receiver, same thing, interleaving will occur.

    Note that task dispatching is fair. Port processing is independent of any other port but if they use the same queue, then the order the tasks where generated, will determine the order they run.

    another way to see things interleave is activate *first*, then start posting. We will then start running tasks in parallel with your for loop Smiley
  • William Staceystaceyw Before C# there was darkness...
    georgioc wrote:
    
    staceyw wrote: 
    georgioc wrote: 
    mml wrote: regarding interleaving...

    Hypothetically, why does the CCR have to wait for say your 1million messages to be coordinated before being able to service for example this 1 other important message you want to process (which is not part of the coordination of the 1million messages but ends up having to wait until those 1million messages get coordinated; or does it wait?)?

    I might have missed this in the interview but does the CCR allow messages to get thrown into the mix before the 1million messages get coordinated that happen to not be part of that coordinated set so to speak. Maybe what I asking is whether or not the scheduling algorithm accomodates this in the interleaving pattern or does the CCR address this in some other way.
    Nice idea by the way, I hope to see more of it. Also, IMHO I wish the threading model would get replaced with something that allowed for better determinacy.



    Hi, the CCR doesnt have to wait And it doesnt. I actually round robin between dispatcher queues associated with the same dispatcher and i will process a message from each queue. So even if you have 1 million messages coming from one port, associate it with its own dispatcher queue, and then associate your other, critical port that receives only one message once in a while, with a different dispatcher queue!!!

    This was a critical feature for real time, priority and alot of other applications, so the CCR was designed from the beginning to have a scheduling systems, that makes this work.

    Jeff's examples show how to create dispatcher queues (one line) and then when you call Arbiter.Activate, pass the queue you want, with the appropriate receive operation.


    I have a question in regards to this.  As the code below shows, Port processing does not seem to be fair.  The first port posting a receiver seems to win the attention of the DQ until it is drained.  Is this a "fair" model? tia

            public static void PortTest()
            {
                Port<int> p1 = new Port<int>();
                Port<int> p2 = new Port<int>();
                for ( int i=0; i<10; i++ )
                {
                    p1.Post(i);
                    p2.Post(i);
                }
                Arbiter.Activate(new DispatcherQueue("q"),
                    Arbiter.Receive(true, p1,
                        delegate(int i)
                        {
                            Console.WriteLine("p1:{0}", i);
                        }),
                    Arbiter.Receive(true, p2,
                        delegate(int i)
                        {
                            Console.WriteLine("p2:{0}", i);
                        })
                );
                // Output:
                //p1:0
                //p1:1
                //p1:2
                //p1:3
                //p1:4
                //p1:5
                //p1:6
                //p1:7
                //p1:8
                //p1:9
                //p2:0
                //p2:1
                //p2:2
                //p2:3
                //p2:4
                //p2:5
                //p2:6
                //p2:7
                //p2:8
                //p2:9
            }


    There is a couple of reasons why you see this, that is an artifact of this little experiment.

    Notice how you posed ALL messages first, then activated. This means the first time you attach a receiver on port 1, we will have to generate alot of tasks since the messages are all there.

    Now, given that, notice that you used the same queue for both receivers.

    So the combination of both causes this, which is expected. And since no guarantees are given for ordering between ports, this is just one possible interleaving.

    In my post above i mention that we round robin among dispatcher queues. So if you try the same exact experiment, but activate independently the receivers on port 1 and 2, using two different dispatcher queues, you will see interleaving take place. Ofcourse between two different dispatchers, with one queue each, for each receiver, same thing, interleaving will occur.

    Note that task dispatching is fair. Port processing is independent of any other port but if they use the same queue, then the order the tasks where generated, will determine the order they run.

    another way to see things interleave is activate *first*, then start posting. We will then start running tasks in parallel with your for loop


    Thanks again George.  I saw the behavior with the seperate queues.  It does beg the question, however, if it would make any sense to round-robin the ports in a single queue or at least allow the option?  As you said, you can get future like behavior with posting to ports before activation.  However with current implementation, you would need to remember and expect this behavior, instead of letting the system run fairness across ports which may seem more intuitive.  Would there be cost to this?  If it could be done cheaply, would it be useful?  Just wondering to help understanding. 

    Also, you had mentioned a couple times in the videos about priority queues.  I don't see how to set priority.  I see thread priority can be set on a Dispatcher, but it sounded like you where talking about Q priority could be set.  So different queues could have different priorities in the same dispatcher.
    Thanks Smiley
  • Hi George,

     

    First of all I want to thank you for the great library – I consider it one of the best pieces in .NET. I just started to learn it and have a few questions.

    1. I have one port and I want dynamically to add and remove receiver. I understand that I can add receiver using Arbiter, for example by using  Arbiter.Activate(dq, Arbiter.Receive(true, port, Handler<>). But how can I remove this receiver? Why there is no method like Arbiter.Deactivate(….)?

     

    2. In another scenario I have several ports and one receiver. How can I serialize calls to the delegate?  Should I use Arbiter.Interleave (ExclusiveReceiverGroup) or I just can use lock(syncObject) in the receiver handler?  Also, is there any way to dynamically add/remove a receiver to a group?

     

    Again, thanks for your work and your passion

  • Alexq wrote:
    

    Hi George,

     

    First of all I want to thank you for the great library – I consider it one of the best pieces in .NET. I just started to learn it and have a few questions.

    1. I have one port and I want dynamically to add and remove receiver. I understand that I can add receiver using Arbiter, for example by using  Arbiter.Activate(dq, Arbiter.Receive(true, port, Handler<>). But how can I remove this receiver? Why there is no method like Arbiter.Deactivate(….)?

     

    2. In another scenario I have several ports and one receiver. How can I serialize calls to the delegate?  Should I use Arbiter.Interleave (ExclusiveReceiverGroup) or I just can use lock(syncObject) in the receiver handler?  Also, is there any way to dynamically add/remove a receiver to a group?

     

    Again, thanks for your work and your passion



    First question (receiver de-activation):

    Hi, there are 3 ways to de-activate a receiver:
    1) make the receiver one time, and the CCR will remove it after it processes one message, and it will do so automatically and atomically
    2) use Arbiter.Choice to coordinate between receivers and the CCR will again remove all receivers after the first receiver executes
    3) use Interleave and the TeardownReceiverGroup, and when a receiver in the group executes all other receivers will be removed

    Ok there is actually one more way but its low level and youhave to be carefull when to use. If you look at the port API itself there is a Register and Unregister receiver, if you want to take matters on your own hands Smiley

    Second question: Serializing calls

    I strongly recommend to not use any calls to traditional thread locking APIS with the CCR. Use interleave, join constructs, activate one time receivers and then re-register them etc, to achieve serialization. Interleave is the most intuitive and takes care of alot of conditions.

    There are actually a few ways to achieve serialization with a system like the CCR, depending on what you are trying to do, but again interleave should allow you with a couple of lines, to serialize all access to  apiece of code, even across asychnronous continuations (if the exclusive receiver is an iterator)

    hope this helps











  • staceyw wrote:
    

    Thanks again George.  I saw the behavior with the seperate queues.  It does beg the question, however, if it would make any sense to round-robin the ports in a single queue or at least allow the option?  As you said, you can get future like behavior with posting to ports before activation.  However with current implementation, you would need to remember and expect this behavior, instead of letting the system run fairness across ports which may seem more intuitive.  Would there be cost to this?  If it could be done cheaply, would it be useful?  Just wondering to help understanding. 

    Also, you had mentioned a couple times in the videos about priority queues.  I don't see how to set priority.  I see thread priority can be set on a Dispatcher, but it sounded like you where talking about Q priority could be set.  So different queues could have different priorities in the same dispatcher.
    Thanks Smiley


    The problem with round robin the ports using a single queue is that it actually makes the system slower (more locks need ot be taken) and it forces the CCR to have some "global" knowledge about the ports in the system , making it non-compositional. It does have the benefit you mention, but i decided to make round robin control explicit through the use of dispatcher queues, keeping all ports independent of each other.

    Some higher level API on top of the CCR could choose this path however, if it makes sense for the app.


    Priorities: Currently using the thread priority across dispatchers is the only publicly exposed way of doing priority dispatching. Queues within a dispatcher are equal. I have a version that sorts them and walks them in terms of priority (and drains them properly etc), but decided against adding it yet. But its something you might see in the future

  • georgioc wrote:
    

    First question (receiver de-activation):

    Hi, there are 3 ways to de-activate a receiver:
    1) make the receiver one time, and the CCR will remove it after it processes one message, and it will do so automatically and atomically
    2) use Arbiter.Choice to coordinate between receivers and the CCR will again remove all receivers after the first receiver executes
    3) use Interleave and the TeardownReceiverGroup, and when a receiver in the group executes all other receivers will be removed

    Ok there is actually one more way but its low level and youhave to be carefull when to use. If you look at the port API itself there is a Register and Unregister receiver, if you want to take matters on your own hands Smiley

    Second question: Serializing calls

    I strongly recommend to not use any calls to traditional thread locking APIS with the CCR. Use interleave, join constructs, activate one time receivers and then re-register them etc, to achieve serialization. Interleave is the most intuitive and takes care of alot of conditions.

    There are actually a few ways to achieve serialization with a system like the CCR, depending on what you are trying to do, but again interleave should allow you with a couple of lines, to serialize all access to  apiece of code, even across asychnronous continuations (if the exclusive receiver is an iterator)

    hope this helps





    Thanks a lot!  I had a feeling that CLR thrddaing locking doesn't sound right!
    One more question:
    How does CCR works across domains? Can I register a receiver which is a callback to a remoting object and will be executed in a different domain? Do you see any problems in this scenario
  • Alexq wrote:
    
    georgioc wrote: 

    First question (receiver de-activation):

    Hi, there are 3 ways to de-activate a receiver:
    1) make the receiver one time, and the CCR will remove it after it processes one message, and it will do so automatically and atomically
    2) use Arbiter.Choice to coordinate between receivers and the CCR will again remove all receivers after the first receiver executes
    3) use Interleave and the TeardownReceiverGroup, and when a receiver in the group executes all other receivers will be removed

    Ok there is actually one more way but its low level and youhave to be carefull when to use. If you look at the port API itself there is a Register and Unregister receiver, if you want to take matters on your own hands Smiley

    Second question: Serializing calls

    I strongly recommend to not use any calls to traditional thread locking APIS with the CCR. Use interleave, join constructs, activate one time receivers and then re-register them etc, to achieve serialization. Interleave is the most intuitive and takes care of alot of conditions.

    There are actually a few ways to achieve serialization with a system like the CCR, depending on what you are trying to do, but again interleave should allow you with a couple of lines, to serialize all access to  apiece of code, even across asychnronous continuations (if the exclusive receiver is an iterator)

    hope this helps





    Thanks a lot!  I had a feeling that CLR thrddaing locking doesn't sound right!
    One more question:
    How does CCR works across domains? Can I register a receiver which is a callback to a remoting object and will be executed in a different domain? Do you see any problems in this scenario


    The CCR works within one CLR appdomain. Across machines we have developed a distributed runtime, that uses the CCR for coordination in each node. That serviced oriented runtime is available also as part of the robotics SDK, its the DSS services/transports.

    You can use the VCR to coordinate i/o even if you dont use DSS, like jeff's code examples show with web, file and net examples. You could apply these with remoting calls as well, but it would work best if they are asynchronous and you wrap them behind CCR ports, in a similar way to the socket example developed in this thread, with staceyw
  • Hello

    I'm working on a problem that seems to be perfect for the CCR. The task at hand is concurrency and coordination against a very picky search index. The index axcepts several different operations, such as search, add, delete, optimize and so on. However, these operations has to be coordinated according to priority, concurrency rules and available resources.

    Priority:
    Search typically requires a fast response.

    Concurrency rules.
    There is a matrix defining which operations that can execute at the same time. Search + Add is ok, Add+Delete is not.

    Resources:
    The resourcelevel may at any time change, resulting in (for instance) a full stop in none search operations.

    Obviously this can be done with some sort of object queue, and some sort of traditinal control unit that, i'm guessing, will have to do a lot of work (and most likely will have some issues). What i'm looking for is a way of implementing my queue fast, and as flexible as possible. My first thought when i read about ccr was that i can get a lot of what i want for free. Am i right? 

    I'm guessing that the right solution is to implement my own port, which supports my set of index operations. What i'm not to sure about is how i can create a receiver that at any time know which queue items to dispatch (according to the concurrency matrix, and priority).


  • sturla wrote:
    Hello

    I'm working on a problem that seems to be perfect for the CCR. The task at hand is concurrency and coordination against a very picky search index. The index axcepts several different operations, such as search, add, delete, optimize and so on. However, these operations has to be coordinated according to priority, concurrency rules and available resources.

    Priority:
    Search typically requires a fast response.

    Concurrency rules.
    There is a matrix defining which operations that can execute at the same time. Search + Add is ok, Add+Delete is not.

    Resources:
    The resourcelevel may at any time change, resulting in (for instance) a full stop in none search operations.

    Obviously this can be done with some sort of object queue, and some sort of traditinal control unit that, i'm guessing, will have to do a lot of work (and most likely will have some issues). What i'm looking for is a way of implementing my queue fast, and as flexible as possible. My first thought when i read about ccr was that i can get a lot of what i want for free. Am i right? 

    I'm guessing that the right solution is to implement my own port, which supports my set of index operations. What i'm not to sure about is how i can create a receiver that at any time know which queue items to dispatch (according to the concurrency matrix, and priority).





    You can justuse the CCR ports as is and decide if you want to have type based dispatch or value based dispatch.

    If you decide on type based dispatch, then you can use multiple interleaves to create the proper exclusions. One way to do it is use different message types (Search, Add, Delete) and when you receive a message of that type, "enter" a state. The CCR + interleave can encode arbitrarily complex state machines trivially allowing you to express complex concurrency rules, just by grouping receivers in the TeardownGroup for the exit conditions of the current state.

    It will require multiple levels of dispatching but it should ok. Basically it goes like this:

    You start with a simple choice statement that has receivers listing on all the possible messages:

    Choice initialChoice
    {
        get
        {
        return  Arbiter.Choice(
           Arbiter.Receive(false,main,SearchState),
           Arbiter.Receive(false,main,AddState),
           Arbiter.Receive(false,main,DeleteState)
        );
        }
    }

    notice each receiver is one time and how we generate this choice from a Getter, on some class. This way we can just create it on demand, when we need it.

    When you get a search message lets say, Search, you enter the handler that now decides the interleaving for Search:

    void SearchState(Search request)
    {
          // when we get a delete atomically exit this state and
          // enter DeleteState
          Arbiter.Activate(
             Arbiter.Interleave(
             new TeardownReceiverGroup(
                   Arbiter.Receive(false,main,DeleteState)
             ),
             new ConcurrentReceiverGroup(
                Arbiter.Receive(true,main, SearchHandler),
                Arbiter.Receive(true,main,AddHandler)
             )
             new ExclusiveRecieverGroup()
          );
    }

    notice that in the SearchState we wait for as many concurrent search and add requests they are but we exit atomically this state when we get a Delete since it needs to be done by itself.

    etc.

    Please check Robotics Tutorial 5 since there we show a complex state machine that manages the serial port, and has similar needs in terms of concurrency, resources, etc







  • Hello and thanks for your answere.

    This surely solves the concurrency issue, however I'm failing to see how i can prioritize among the different tasks.

    I've been playing around with TearDownReceiverGroup(), to allow for "only" search message dispatching. It seems that the TearDown happens after all items in the queue has been handled. For instance..if the queue contains 10 adds, and 5 delete, while a new search message enters the queue, how can I setup my statemachine so that the search message is dispatched before the others messages already in the queue.



  • sturla wrote:
    Hello and thanks for your answere.

    This surely solves the concurrency issue, however I'm failing to see how i can prioritize among the different tasks.

    I've been playing around with TearDownReceiverGroup(), to allow for "only" search message dispatching. It seems that the TearDown happens after all items in the queue has been handled. For instance..if the queue contains 10 adds, and 5 delete, while a new search message enters the queue, how can I setup my statemachine so that the search message is dispatched before the others messages already in the queue.





    Teardown should occur when all handlers have stopped executing, since it has strong guarantees. It will not wait until all queued messages have executed, just when all handlers currently running are done. At least that is the intent. The idea is that we bias the interleave towards exclusive handlers and the teardown group, so any more queue messages will not be processed the moment a exclusive/teardown message is received.

    In general controlling priorities further than what interleave gives you, with its exclusive bias, will increase complexity. For tight control you can do what i call "tunneling":
    Activate a single untyped receiver on a port, which takes object. Any message arriving on that port, of any type, will invoke the same, untyped handler. In that handler, re-submit the message based on type, to a port set that is strongly typed, with its own set of receivers. This allows you to control dispatching and if a port of a particulare message type is full or empty or whatever, you can decide to discard or change the  priority of message processing.

    Port<object> main = new Port<objecy>();

    PortSet<Search,Add,Delete> secondary = new PortSet<Search,Add,Delete>();

    Arbiter.Activate(taskQueue, Arbiter.Receive(true,main,
        delegate(object msg)
        {
            if(secondary.P0.ItemCount == 0)
            // no search messages pending
            {
                 // check message type, spawn handler
            }
            else
            {
                 // queue messages for later processing
                 // ccr will now enqueue in the right sub port
                 // based on runtime type
                 secondary.Post(msg);
            }
         }));


    above is a just snippet that essentially , through double dsipatching, implements a form  of interleave, where you put some logic to prioritize processin based on currently enqueued messages, and on incoming message type. You can even use predicates to do this, with no need for a seperate port.










  • Hi George,

    I have basically two questions concerning CCR.

    1. How can I cancel pending ReceiverTasks? Let's say that I have created some permanent Receivers and wired them up with some Port. Finally, I've posted some 1000 items to that port.  I have the Cancel button on my form and now I want to cancel all pending tasks. Should I use another DispatcherQueue to preempt existing queued tasks? How can I do that?

    2. I am playing with the WinFormsAdaptor and WinFormsServicePort. Inside my main method I wrote following:
       
    static void Main()
    {
      Application.EnableVisualStyles();
      Application.SetCompatibleTextRenderingDefault(false);

      Dispatcher dispatcher = new Dispatcher(0, "CCRDemo Threads");
      DispatcherQueue dispatcherQueue = new  
        DispatcherQueue("CCRDemo DispatcherQueue", dispatcher);
      WinFormsServicePort port =
        WinFormsAdaptor.Create(dispatcherQueue);

      port.Post(new RunForm(new WinFormConstructor(delegate
      {
        return new MainForm(dispatcherQueue);
      })));
    }

    MainForm displays properly and everything looks nice until I push close button on my form. Form closes but application hangs i.e. WinFormsUIThread running hidden form doesn't stop. What should I do to properly shutdown the app?

    BTW, I haven't slept properly lately because of the CCR Perplexed. It is so fascinating! It took me a couple hours to grasp the basics and another couple with Reflector to grasp what's really inside. One really gets productive after, basically, very short period. All my previous multithreading attempts were painful coordination between various Manual/AutoResetEvents and other mt primitives, various Queues with posted data etc. What I am trying to do now is to come up with an example where I can not use CCR. I haven't succeeded yet! CCR should definitely be a part of the next .NET version Smiley. Bravo!

    ok
  • orest wrote:
    Hi George,

    I have basically two questions concerning CCR.

    1. How can I cancel pending ReceiverTasks? Let's say that I have created some permanent Receivers and wired them up with some Port. Finally, I've posted some 1000 items to that port.  I have the Cancel button on my form and now I want to cancel all pending tasks. Should I use another DispatcherQueue to preempt existing queued tasks? How can I do that?

    2. I am playing with the WinFormsAdaptor and WinFormsServicePort. Inside my main method I wrote following:
       
    static void Main()
    {
      Application.EnableVisualStyles();
      Application.SetCompatibleTextRenderingDefault(false);

      Dispatcher dispatcher = new Dispatcher(0, "CCRDemo Threads");
      DispatcherQueue dispatcherQueue = new  
        DispatcherQueue("CCRDemo DispatcherQueue", dispatcher);
      WinFormsServicePort port =
        WinFormsAdaptor.Create(dispatcherQueue);

      port.Post(new RunForm(new WinFormConstructor(delegate
      {
        return new MainForm(dispatcherQueue);
      })));
    }

    MainForm displays properly and everything looks nice until I push close button on my form. Form closes but application hangs i.e. WinFormsUIThread running hidden form doesn't stop. What should I do to properly shutdown the app?

    BTW, I haven't slept properly lately because of the CCR . It is so fascinating! It took me a couple hours to grasp the basics and another couple with Reflector to grasp what's really inside. One really gets productive after, basically, very short period. All my previous multithreading attempts were painful coordination between various Manual/AutoResetEvents and other mt primitives, various Queues with posted data etc. What I am trying to do now is to come up with an example where I can not use CCR. I haven't succeeded yet! CCR should definitely be a part of the next .NET version . Bravo!

    ok



    1) The best way to do this is to use an Interleave with one TeardownReceiver, and put your PermanentReceivers under the ConcurrentGroup. If at some time you decide to not process any more messages, just post a message that will cause the teardown and we will make sure all these items nevr get processed and eventually garbage collected. Its very easy approach, interleave makes it a one liner Smiley

    2) The CCR by default uses Foreground threads for dispatcher which mean that your app will not exit unless you call Dispose on the Dispatcher instances you have created. If you dont want this behavior, before you start anything, set Dispatcher.UseBackThreads = true, in your startup code, and then everything will exit when your app main thread exits

    Btw the latest CCR is available as the September CTP download of the robotics SDK.

    I am very glad you enjoy using the CCR, it has really made a difference for a huge diversity of tasks we have tackled, and its so nice to see others using it (we have had lots of downloads of our SDK and i suspect more than a few are for the CCR alone Smiley

  • Hi George!

    CCR is great, I'm really enjoying it!

    Two quick questions:

    1. How can I get some code to execute when the last item is removed from a port/dispatcherQueue? IE: I want to post a set of related workitems, yield, and continue only when all items have been dealt with...

    2. I have a set of related workitems which I post into a persistent port, I then go and set up an Interleave arbiter so that if one fails, the entire arbitration is torn down. How can I ensure that the items that where in the port's queue at the time the teardown occurs are popped out - IE: how can I make sure they won't be around the next time some Receivers are attached to the Port?

    Thanks,

    Shmarya
  • shmarya wrote:
    Hi George!

    CCR is great, I'm really enjoying it!

    Two quick questions:

    1. How can I get some code to execute when the last item is removed from a port/dispatcherQueue? IE: I want to post a set of related workitems, yield, and continue only when all items have been dealt with...

    2. I have a set of related workitems which I post into a persistent port, I then go and set up an Interleave arbiter so that if one fails, the entire arbitration is torn down. How can I ensure that the items that where in the port's queue at the time the teardown occurs are popped out - IE: how can I make sure they won't be around the next time some Receivers are attached to the Port?

    Thanks,

    Shmarya


    1. I assume you know upfront some pre determined count of messages otherwise its hard to tell which is last. You could use a timeout port etc, if you want to stop after a break in receiving messags. But in general a message could arrive at any time, even if there times where nothing is going on. Assuming you have a counter or some way to count you have two options:

       a) Use multipleItemReceive, and specify the number of items you expect. Then your handler will be executed when all N items are received
     b) use a one time Receiver once to run your handler as the first message arrives, increment a counter, then as long as count <Total, keep re-issuing the receiver, essentially throttling and waiting for the last message on your own
    c) use a persistent receiver in a exclusive block in an interleave, use a counter as in b), but then issue a teardown message to a receiver in the TeardownGroup


    2. In your teardown receiver, use Port.Test() to essentially remove all items from the port, so its empty for next time.

       while (myPortSet.Test<MyMessage>() != null);

  • georgioc wrote:
    

    1. I assume you know upfront some pre determined count of messages otherwise its hard to tell which is last. You could use a timeout port etc, if you want to stop after a break in receiving messags. But in general a message could arrive at any time, even if there times where nothing is going on. Assuming you have a counter or some way to count you have two options:

       a) Use multipleItemReceive, and specify the number of items you expect. Then your handler will be executed when all N items are received
     b) use a one time Receiver once to run your handler as the first message arrives, increment a counter, then as long as count <Total, keep re-issuing the receiver, essentially throttling and waiting for the last message on your own
    c) use a persistent receiver in a exclusive block in an interleave, use a counter as in b), but then issue a teardown message to a receiver in the TeardownGroup



    Hi Georgio...

    I have a number of items which I want to execute 'concurrently', in addition, I want to be able to run specific code only when all the items have completed... I suppose I could use a multipleItemReceiver on a 'Response' port - IE. setup some persistent arbiter to run the work on the items concurrently, and then have each item post a response to the central Response port - then have the initial calling method register an arbiter with a choice between a timeout and a multipleItemReciever...

    In this case, how can I unregister the original receivers when a timeout occurs? I suppose I can put it all inside a single interleave... but I am not sure that is the best possible solution...

    One question regarding this: is the implementation of Port<> thread safe? IE, is it possible to post to it from multiple threads?

    Another (unrelated) question: How does the Ccr function across AppDomain/Process boundaries via Remoting? Are there any caveats I should be aware of?

    Thanks,
    Shmarya
  • shmarya wrote:
    
    Hi Georgio...

    I have a number of items which I want to execute 'concurrently', in addition, I want to be able to run specific code only when all the items have completed... I suppose I could use a multipleItemReceiver on a 'Response' port - IE. setup some persistent arbiter to run the work on the items concurrently, and then have each item post a response to the central Response port - then have the initial calling method register an arbiter with a choice between a timeout and a multipleItemReciever...

    In this case, how can I unregister the original receivers when a timeout occurs? I suppose I can put it all inside a single interleave... but I am not sure that is the best possible solution...

    One question regarding this: is the implementation of Port<> thread safe? IE, is it possible to post to it from multiple threads?

    Another (unrelated) question: How does the Ccr function across AppDomain/Process boundaries via Remoting? Are there any caveats I should be aware of?

    Thanks,
    Shmarya


    Hi
    all CCR apis are thread safe, so Port<> and PortSet<> are safe to post from multiple threads (thats the usual usage scenario).


    You can unregister a receiver either by making one time ( a multiple item receive would receive N items and then unregister itself if its persist = false), or possibly use the Choice arbiter, for one time receivers that cancel all branchs when the timout branch fires.
    Interleave, with the timeout branch being the in the TeardownGroup is another option.

    The CCR, just like any other CLR library should work fine with remoting assuming you take care of marshalling instances of messages between domains. CCR is not appdomain aware. For distributed scenarios we use our DSS service model, that is deeply integrated with the CCR plus has a very nice state based model for easy inspection and composition across services.


  • George,

    I've become a big fan and evangelist of the CCR and have been experimenting with it in a few Windows services I maintain.  I do have a few questions for you as to it's use in my scenarios.

    First of all, every Arbiter I use is persistent and when the service recieves a shutdown signal I want to cancel all pending tasks.  I have found that by calling Dispose on the DispatcherQueue instances I can clear out the tasks that have not yet started while allowing those currently running to continue.  Would you consider this approach a safe way of cancelling pending/queued tasks?

    Secondly, all of the work performed by the threads in my application require access to a shared resource and I need to ensure that all of the work is complete before I close up the resource and exit the application.  I've experimented with the DispatcherQueue.Count and Dispatcher.PendingTasks properties but have found that they are both decremented when a task begins to run, and the Dispatcher.ProcessedTasks property is also incremented at that time.  I've taken to the approach of having a property that reflects the current state of the object that is running on the CCR thread to ensure it is complete such as:

    while(!AllWorkComplete())
    {
        Thread.Sleep(100);
    }

    CloseSharedResource();

    Where all work complete iterates through the objects that are performing the work and return false if any one is still running.  This is working in my application but I was wondering why there is not a "RunningTasks" property on the Dispatcher similar to PendingTasks and ProcessedTasks.

    The CCR has made my life so much easier and my code far cleaner and easier to maintain - especially using Interleave for concurrent and exclusive tasks.

    Thanks,

    Mike


  • mgarski wrote:
    George,

    I've become a big fan and evangelist of the CCR and have been experimenting with it in a few Windows services I maintain.  I do have a few questions for you as to it's use in my scenarios.

    First of all, every Arbiter I use is persistent and when the service recieves a shutdown signal I want to cancel all pending tasks.  I have found that by calling Dispose on the DispatcherQueue instances I can clear out the tasks that have not yet started while allowing those currently running to continue.  Would you consider this approach a safe way of cancelling pending/queued tasks?

    Secondly, all of the work performed by the threads in my application require access to a shared resource and I need to ensure that all of the work is complete before I close up the resource and exit the application.  I've experimented with the DispatcherQueue.Count and Dispatcher.PendingTasks properties but have found that they are both decremented when a task begins to run, and the Dispatcher.ProcessedTasks property is also incremented at that time.  I've taken to the approach of having a property that reflects the current state of the object that is running on the CCR thread to ensure it is complete such as:

    while(!AllWorkComplete())
    {
        Thread.Sleep(100);
    }

    CloseSharedResource();

    Where all work complete iterates through the objects that are performing the work and return false if any one is still running.  This is working in my application but I was wondering why there is not a "RunningTasks" property on the Dispatcher similar to PendingTasks and ProcessedTasks.

    The CCR has made my life so much easier and my code far cleaner and easier to maintain - especially using Interleave for concurrent and exclusive tasks.

    Thanks,

    Mike





    Glad to hear its working well. On your questions:

    1) Per service, assuming it has an interleave around its persistent receivers, i would recommend having a one time receiver in a teradown group, to do any cleanup . This guarantees teardown will happen after all receivers for that service have stopped executing, and no more will ever run after. What you use now is more appropriate for process wide, cleanup

    2) Also here, interleave takes care of this since it exactly tracks internally what receivers are running. Then , if you use the teardown group, when you decide to shutdown, your shutdown will only run when all active tasks generated from within the interleave, have stopped.

    If its not feasible to use interleave or you want to make sure that everything is done across multiple interleaves or other arbiters, all associated with one dispatcher, then your approach is fine.
    We don have a running tasks value since its almost alwasy guaranteed to be out of sync and will force even more memory coherence between workers, slowing down perf. We try to keep the dispacther workers independent of each other, so they can run at maximum concurrency. Note that the max number of active tasks will always be equal to the thread workers, never more. So if the queue counts for the dispatcher queues are < Worker count, that means things are about to idle.

    g
  • georgioc wrote:
    
    mgarski wrote:George,

    I've become a big fan and evangelist of the CCR and have been experimenting with it in a few Windows services I maintain.  I do have a few questions for you as to it's use in my scenarios.

    First of all, every Arbiter I use is persistent and when the service recieves a shutdown signal I want to cancel all pending tasks.  I have found that by calling Dispose on the DispatcherQueue instances I can clear out the tasks that have not yet started while allowing those currently running to continue.  Would you consider this approach a safe way of cancelling pending/queued tasks?

    Secondly, all of the work performed by the threads in my application require access to a shared resource and I need to ensure that all of the work is complete before I close up the resource and exit the application.  I've experimented with the DispatcherQueue.Count and Dispatcher.PendingTasks properties but have found that they are both decremented when a task begins to run, and the Dispatcher.ProcessedTasks property is also incremented at that time.  I've taken to the approach of having a property that reflects the current state of the object that is running on the CCR thread to ensure it is complete such as:

    while(!AllWorkComplete())
    {
        Thread.Sleep(100);
    }

    CloseSharedResource();

    Where all work complete iterates through the objects that are performing the work and return false if any one is still running.  This is working in my application but I was wondering why there is not a "RunningTasks" property on the Dispatcher similar to PendingTasks and ProcessedTasks.

    The CCR has made my life so much easier and my code far cleaner and easier to maintain - especially using Interleave for concurrent and exclusive tasks.

    Thanks,

    Mike





    Glad to hear its working well. On your questions:

    1) Per service, assuming it has an interleave around its persistent receivers, i would recommend having a one time receiver in a teradown group, to do any cleanup . This guarantees teardown will happen after all receivers for that service have stopped executing, and no more will ever run after. What you use now is more appropriate for process wide, cleanup

    2) Also here, interleave takes care of this since it exactly tracks internally what receivers are running. Then , if you use the teardown group, when you decide to shutdown, your shutdown will only run when all active tasks generated from within the interleave, have stopped.

    If its not feasible to use interleave or you want to make sure that everything is done across multiple interleaves or other arbiters, all associated with one dispatcher, then your approach is fine.
    We don have a running tasks value since its almost alwasy guaranteed to be out of sync and will force even more memory coherence between workers, slowing down perf. We try to keep the dispacther workers independent of each other, so they can run at maximum concurrency. Note that the max number of active tasks will always be equal to the thread workers, never more. So if the queue counts for the dispatcher queues are < Worker count, that means things are about to idle.

    g


    George -

    Thanks for your answer, I am currently using an interleave and will add a TeardownReceiverGroup that will get an object posted to it on the service's shutdown signal.

    Given that, I'm assuming that if I want to ensure that all of the messages on the dispatcher queue are processed I would want to do something like:

    while(dispatcherQueue.Count > 0)
    {
        Thread.Sleep(100);
    }

    shutdownPort.Post(new Object());

    I think this would safely let all of the queued up tasks to process and then post to my shutdown port firing off the TeardownReceiverGroup to perform any final cleanup necessary, correct?

    Thanks again,

    Mike
  • mgarski wrote:
    

    George -

    Thanks for your answer, I am currently using an interleave and will add a TeardownReceiverGroup that will get an object posted to it on the service's shutdown signal.

    Given that, I'm assuming that if I want to ensure that all of the messages on the dispatcher queue are processed I would want to do something like:

    while(dispatcherQueue.Count > 0)
    {
        Thread.Sleep(100);
    }

    shutdownPort.Post(new Object());

    I think this would safely let all of the queued up tasks to process and then post to my shutdown port firing off the TeardownReceiverGroup to perform any final cleanup necessary, correct?

    Thanks again,

    Mike


    Correct, that would clear all other tasks on this dispatcher. I ll open a bug so the CCR does this for you, specified as an optional parameter on dispatcher shutdown
  • This stuff is really intresting. Would it be possible to use this to implement something similiar to stackless python( ironpython + ccr = stackless python in clr, now that would just make my day )
  • oddurmagnusson wrote:
    This stuff is really intresting. Would it be possible to use this to implement something similiar to stackless python( ironpython + ccr = stackless python in clr, now that would just make my day )


    we have CCR + python running, so yes Smiley
    See the python examples in the robotics tutorials (ccr ships as an independent, reusable part of the robotics CTP)
  • Hi, CCR has some new features relating to throttling and rate based scheduling. They are available in the latest robotics CTP and explained briefly here (towards the bottom, where it talks about ccr/dss):
    http://msdn.microsoft.com/robotics/getstarted/ctp4/default.aspx 

    Basically the ccr dispatcher queue allows you to specify task execution based on constraints. This eliminates alot of manual throttling and increases determinism:

    /// <summary>

    /// Specifies dispatcher queue task scheduling behavior

    /// </summary>

    public enum TaskExecutionPolicy

    {

    /// <summary>

    /// Default behavior, all tasks are queued with no constraints

    /// </summary>

    Unconstrained = 0,

    /// <summary>

    /// Queue enforces maximum depth (specified at queue creation)

    /// and discards tasks enqueued after the limit is reached

    /// </summary>

    ConstrainQueueDepthDiscardTasks,

    /// <summary>

    /// Queue enforces maximum depth (specified at queue creation)

    /// but does not discard anny tasks. It forces the thread posting any tasks after the limit is reached, to

    /// sleep until the queue depth falls below the limit

    /// </summary>

    ConstrainQueueDepthThrottleExecution,

    /// <summary>

    /// Queue enforces the rate of task scheduling specified at queue creation

    /// and discards tasks enqueued after the current scheduling rate is above the specified rate

    /// </summary>

    ConstrainSchedulingRateDiscardTasks,

    /// <summary>

    /// Queue enforces the rate of task scheduling specified at queue creation

    /// and forces the thread posting tasks to sleep until the current rate of task scheduling falls below

    /// the specified average rate

    /// </summary>

    ConstrainSchedulingRateThrottleExecution

    }

  • Hi George,

    I have a question:

    I am trying to build a method which may be either asynchronous or synchronous, using CCR internally (to the method) to accomplish the asynchronousicity...

    So basically I have a method which has two local ports, one for the input message and the other for the returned results.

    Everything is fine until I want to block on the receiver for the result port in order to acheive 'synchronous' operation.

    How can I make the Activate call block?

    Is there a better method for doing this?

    Thanks,
    Shmarya



  • Hi George,

    My implementation of the CCR is going really well and it's great to see new versions coming out every month.

    I have an issue that is niggling at me though and it's to do with the ProcessedTaskCount of a Dispatcher.  I have a long running Windows Service that has 1 Dispatcher and multiple Dispatcher Queues.  The service is quite busy, so the ProcessedTaskCount goes up quite quickly.  Given that ProcessTaskCount is an Int32, it's not unreasonable that over some time I will hit the Int32.MaxValue and the ccr will then bring my AppDomain down.  Does this value recycle before it reaches Int32.MaxValue?

     

    Many thanks

     

    Marshall

  • shmarya wrote:
    Hi George,

    I have a question:

    I am trying to build a method which may be either asynchronous or synchronous, using CCR internally (to the method) to accomplish the asynchronousicity...

    So basically I have a method which has two local ports, one for the input message and the other for the returned results.

    Everything is fine until I want to block on the receiver for the result port in order to acheive 'synchronous' operation.

    How can I make the Activate call block?

    Is there a better method for doing this?

    Thanks,
    Shmarya






    Make the parent method an Iterator, spawn it (either enqueue an IterativeTask with some arguments, or post a message to a port with an iterator receiver attached.

    In the iterator method, you can do yield return, which is the synchronous version of Activate().

    There lots of examples of CCR with iterators in the robotics tutorials and samples. The MSDN article also covers them.

    thanx
    g
  • tartufella wrote:
    

    Hi George,

    My implementation of the CCR is going really well and it's great to see new versions coming out every month.

    I have an issue that is niggling at me though and it's to do with the ProcessedTaskCount of a Dispatcher.  I have a long running Windows Service that has 1 Dispatcher and multiple Dispatcher Queues.  The service is quite busy, so the ProcessedTaskCount goes up quite quickly.  Given that ProcessTaskCount is an Int32, it's not unreasonable that over some time I will hit the Int32.MaxValue and the ccr will then bring my AppDomain down.  Does this value recycle before it reaches Int32.MaxValue?

     

    Many thanks

     

    Marshall




    Hi, we do handle the rolling over case, so the CCR will not affect the appdomain when this happens.
  • Hi George,

    I have a comment about the UseBackgroundThreads static property on Dispatcher.

    It looks like you can set this property and its value is used during the creation of a Dispatcher instance. It feels to me like it should be a contructor parameter instead of a static property.

    The reason I ask is that we have CCR code (running in the same AppDomain) that was written by several developers. It seems like having to do something like this introduces a race condition:

    bool old = Dispatcher.UseBackgroundThreads;
    Dispatcher.UseBackgroundThreads = true;
    Dispatcher d = new Dispatcher();
    Dispatcher.UseBackgroundThreads = old;

    Am I missing the reason for this design decision?

    Thanks,
    Dan


  • dfarino wrote:
    Hi George,

    I have a comment about the UseBackgroundThreads static property on Dispatcher.

    It looks like you can set this property and its value is used during the creation of a Dispatcher instance. It feels to me like it should be a contructor parameter instead of a static property.

    The reason I ask is that we have CCR code (running in the same AppDomain) that was written by several developers. It seems like having to do something like this introduces a race condition:

    bool old = Dispatcher.UseBackgroundThreads;
    Dispatcher.UseBackgroundThreads = true;
    Dispatcher d = new Dispatcher();
    Dispatcher.UseBackgroundThreads = old;

    Am I missing the reason for this design decision?

    Thanks,
    Dan





    I decided to make it a static since it needs to be applied to all dispatchers to have the desried effect on the app domain. If even one dispatcher is created with  a different setting, then app domain will not exit/exit as expected. Its not meant to be change dmultiple time.

    Now what this means is that it should probably be static single assignment, aka write once, read many. The other option is to let people manage tis, like you suggest and make it a constructor parameter.

    Can you describe your scenario where different dispatcher need different settings, within one appdomain?
  • Hi George,

    Dan is a coworker of mine and posted this after I commented about that.

    We don't necessarily have a situation where we NEED the ability to have different settings, but we do have a situation where changing the value could affect code in different libraries, and I felt that was a Bad Thing (tm).

    Basically, CCR has caught on like wildfire around here, so we have multiple libraries in the same AppDomain using it simultaneously. I personally have a component based service where the service itself has a Dispatcher for message distribution and then one the components has its own for part of its message processing tasks.

    Since I happen to have written both the service and this component, it's not much of a concern. But other developers will be writing components for the service, and it feels programmatically rude to change that static value and have it affect their instantiations. It's not a huge deal to save the old value and change it back, but it is kind of ugly Perplexed

    Thanks for the wonderful library! This is a situation that could only arise from us loving it so much.
  • enelson wrote:
    Hi George,

    Dan is a coworker of mine and posted this after I commented about that.

    We don't necessarily have a situation where we NEED the ability to have different settings, but we do have a situation where changing the value could affect code in different libraries, and I felt that was a Bad Thing (tm).

    Basically, CCR has caught on like wildfire around here, so we have multiple libraries in the same AppDomain using it simultaneously. I personally have a component based service where the service itself has a Dispatcher for message distribution and then one the components has its own for part of its message processing tasks.

    Since I happen to have written both the service and this component, it's not much of a concern. But other developers will be writing components for the service, and it feels programmatically rude to change that static value and have it affect their instantiations. It's not a huge deal to save the old value and change it back, but it is kind of ugly

    Thanks for the wonderful library! This is a situation that could only arise from us loving it so much.


    First of all i am glad you are loving it so much Smiley

    Yes, indeed it does feel wierd to have a static for this. However remember that the background thread setting only makes sense if its global: If its not, its effect on application shutdown is eliminated. Even one dispatcher with foreground threads would keep the app from exiting.

    May I ask why is the value being touched? Is it to control application exit behavior or to simply request background threads for the benefits they provide (better cpu sharing with other apps etc)
  • An update on the FIFO issue and Interleave that was raised by staceyw in this forum. I have found and fixed the issue causing this and it turns out interleave should have worked FIFO for exclusive receivers all along.
    See more details here:
    http://www.microsoft.com/communities/newsgroups/en-us/default.aspx?dg=microsoft.public.msroboticsstudio&mid=618d9b8a-7194-4185-8d89-e8fff887395b&sloc=en-us
  • George,

    I am trying to create an alternate receiver that would send a message to all predicates and applicable handlers, versus stopping on a single Predicate match.

    There is a problem with my implementation not getting a second pass through the EvaluatorAggregateTask's Execute method after the first yield return.  I also tried to return an alternate IEnumerator<> from a List<> and received the same behavior.

    I was able to get all the EvaluatorTask<> enqueued by calling the DispatchQueue.Enqueue, not sure if this is correct.  Do you know if the framework should be executing all of the tasks returned back in the enumerator?  Also please mention anything I look at doing differently.

    Thanks.

    Code snippets as follows:

    public class TestReceiver<T>
        : Receiver
    {
         public TestReceiver(bool persist, IPortReceive port, 
              params Tuple<Predicate<T>, Handler<T>>[] tuples)
             : base(true, port, new EvaluatorAggregateTask<T>(tuples))
         {
              // no code
         }
     }

     internal class EvaluatorAggregateTask<V>
         : ITask
     {
          public IEnumerator<ITask> Execute()
          {
              foreach (Tuple<Predicate<V>, Handler<V>> tuple in this.tuples)
              {
                  Predicate<V> predicate = (Predicate<V>)tuple.Item0;
                  Handler<V> handler = (Handler<V>)tuple.Item1;
                  yield return new EvaluatorTask<V>((PortElement<V>)this[0],
                         handler, predicate);
              }
          }

         [Rest of ITask implementation here ... ]
     }
     
     internal class EvaluatorTask<V>
         : ITask
     {
          public IEnumerator<ITask> Execute()
          {
              if (this.predicate(this.param.TypedItem) == true)
              {
                  this.handler(this.param.TypedItem);
              }
              return null;
          }

         [Rest of ITask implementation here ... ]
     }

  • George,

    Can you refer me to a library with similar functionality as the CCR, though for unmanaged C++?

    Thanks in advance,

    Kroki
  • dtspence wrote:
    

    George,

    I am trying to create an alternate receiver that would send a message to all predicates and applicable handlers, versus stopping on a single Predicate match.

    There is a problem with my implementation not getting a second pass through the EvaluatorAggregateTask's Execute method after the first yield return.  I also tried to return an alternate IEnumerator<> from a List<> and received the same behavior.

    I was able to get all the EvaluatorTask<> enqueued by calling the DispatchQueue.Enqueue, not sure if this is correct.  Do you know if the framework should be executing all of the tasks returned back in the enumerator?  Also please mention anything I look at doing differently.

    Thanks.

    Code snippets as follows:

    public class TestReceiver<T>
        : Receiver
    {
         public TestReceiver(bool persist, IPortReceive port, 
              params Tuple<Predicate<T>, Handler<T>>[] tuples)
             : base(true, port, new EvaluatorAggregateTask<T>(tuples))
         {
              // no code
         }
     }

     internal class EvaluatorAggregateTask<V>
         : ITask
     {
          public IEnumerator<ITask> Execute()
          {
              foreach (Tuple<Predicate<V>, Handler<V>> tuple in this.tuples)
              {
                  Predicate<V> predicate = (Predicate<V>)tuple.Item0;
                  Handler<V> handler = (Handler<V>)tuple.Item1;
                  yield return new EvaluatorTask<V>((PortElement<V>)this[0],
                         handler, predicate);
              }
          }

         [Rest of ITask implementation here ... ]
     }
     
     internal class EvaluatorTask<V>
         : ITask
     {
          public IEnumerator<ITask> Execute()
          {
              if (this.predicate(this.param.TypedItem) == true)
              {
                  this.handler(this.param.TypedItem);
              }
              return null;
          }

         [Rest of ITask implementation here ... ]
     }



    Hi Tasks never get their execute method called more than once. Persistent receivers even, will clone the task that runs the handler, and call it once for each message.

    To achieve what you want, there is a different approach. Create the alternate receiver, just like you did. In the receiver's Evaluate method (override it) take the IPortElement.Value contents, and essentially broad cast to any other port or handlers you want. Make sure you return true from the Evaluate, so the port does not keep the message in its queue



  • kroki wrote:
    George,

    Can you refer me to a library with similar functionality as the CCR, though for unmanaged C++?

    Thanks in advance,

    Kroki


    Hi i am not aware of any such library. A CCR C++ version would be nice but its not in our plans at this point.
  • Thank you,

    Kroki
  • George,

    Thanks for the reply.  Your suggestion worked great.  I have another related question.

    Could you explain what the runtime does with the return type of IEnumerator<ITask> a little?  The runtime seemed to execute the first task, but not the others.

    Thanks.

    georgioc wrote:
    
    dtspence wrote: 

    George,

    I am trying to create an alternate receiver that would send a message to all predicates and applicable handlers, versus stopping on a single Predicate match.

    There is a problem with my implementation not getting a second pass through the EvaluatorAggregateTask's Execute method after the first yield return.  I also tried to return an alternate IEnumerator<> from a List<> and received the same behavior.

    I was able to get all the EvaluatorTask<> enqueued by calling the DispatchQueue.Enqueue, not sure if this is correct.  Do you know if the framework should be executing all of the tasks returned back in the enumerator?  Also please mention anything I look at doing differently.

    Thanks.

    Code snippets as follows:

    public class TestReceiver<T>
        : Receiver
    {
         public TestReceiver(bool persist, IPortReceive port, 
              params Tuple<Predicate<T>, Handler<T>>[] tuples)
             : base(true, port, new EvaluatorAggregateTask<T>(tuples))
         {
              // no code
         }
     }

     internal class EvaluatorAggregateTask<V>
         : ITask
     {
          public IEnumerator<ITask> Execute()
          {
              foreach (Tuple<Predicate<V>, Handler<V>> tuple in this.tuples)
              {
                  Predicate<V> predicate = (Predicate<V>)tuple.Item0;
                  Handler<V> handler = (Handler<V>)tuple.Item1;
                  yield return new EvaluatorTask<V>((PortElement<V>)this[0],
                         handler, predicate);
              }
          }

         [Rest of ITask implementation here ... ]
     }
     
     internal class EvaluatorTask<V>
         : ITask
     {
          public IEnumerator<ITask> Execute()
          {
              if (this.predicate(this.param.TypedItem) == true)
              {
                  this.handler(this.param.TypedItem);
              }
              return null;
          }

         [Rest of ITask implementation here ... ]
     }



    Hi Tasks never get their execute method called more than once. Persistent receivers even, will clone the task that runs the handler, and call it once for each message.

    To achieve what you want, there is a different approach. Create the alternate receiver, just like you did. In the receiver's Evaluate method (override it) take the IPortElement.Value contents, and essentially broad cast to any other port or handlers you want. Make sure you return true from the Evaluate, so the port does not keep the message in its queue



  • dtspence wrote:
    George,

    Thanks for the reply.  Your suggestion worked great.  I have another related question.

    Could you explain what the runtime does with the return type of IEnumerator<ITask> a little?  The runtime seemed to execute the first task, but not the others.

    Thanks.
    .....


    The C# compliler re-writes all functions that return IENumerator to look like a heap allocated class that implements a state machine. Every time you yield, the state machine is updated, and the CCR uses that to do MoveNext when the constraint (for a example a receive occuring) is satisfied. Basically the CCR figured out how to use iterator for async scheduling.
    Now, for all this to happen, you must implement the proper Task which can support linked iterators. This is what choice, Receive, Join etc have to do. Its not enough to just derive from task, since you cant yield to a simple task: Its not clear when a task is "done". To do this, you will need to support the LinkedIterator property plus some other fields.

    Its a bit of an advanced subject that i think should be covered by a new article or even book on the ccr Smiley
  • Microsoft Robotics Studio version 1.0 and with it the CCR library have released!

    Please visit www.microsoft.com/robotics for the latest bits.

    Note on the licensing: CCR, which is part of the MSRS runtime is covered by the same license as the robotics runtime. For non-commercial use (please read the license for more details) its use is free. For commercial use however there is a fee (good news is that with one MSRS license you get permission for multiple runtime instances)

    For any questions please refer to the license and also feel free to post to the newsgroups.

    thanks to all for all the great feedback
  • Hi,

    How does CCR compare to DataRush from Pervasive in Java land (http://www.pervasivedatarush.com/technology).


    Are these two in similar concept?

    Thanks.
    Raghu/..

  • George,

    I'm wondering if you could comment on the recommended use of ThreadCausalities in CCR (in my case non-DSS environment). I can see that Causalities will be transferred from the current thread to the  post operation, and that they are conversely transferred from the ITask back to the executing thread, making the context available to delegates as called from Receivers, etc.

    The obvious use is to provide an ExceptionPort that can take care of any unhandled exceptions that might have been thrown during events, etc. I have not really seen too many examples, nor is is clear what the appropriate patterns of use really are..

    Possible example: I'm inside an iterator doing a number of yield return operations, etc, to Choices, etc. and may want that iterator to have a way to handle any unhandled exceptions during those operations, so I add a CausalityContext within that iterator.

    Then perhaps one of the services that I do a yield return call to (it returning a Choice) decides that it needs a 'nested' causality, so it creates one for it's exception handling (of unhandled exceptions in the dispatcher chain).. Once that service posts back thru the Choice, it appears that the original Iterator has it's restored CausalityContext in place again.

    Any insight on the appropriate usage patterns would really be appreciated. Also the intent of the CoordinationPort?

    The CCR appears to be an extremely useful library for a vast number off cases and I really appreciate your time in filling in the gaps!

    tnx again!
    Don
  • DonSch wrote:
    George,

    I'm wondering if you could comment on the recommended use of ThreadCausalities in CCR (in my case non-DSS environment). I can see that Causalities will be transferred from the current thread to the  post operation, and that they are conversely transferred from the ITask back to the executing thread, making the context available to delegates as called from Receivers, etc.

    The obvious use is to provide an ExceptionPort that can take care of any unhandled exceptions that might have been thrown during events, etc. I have not really seen too many examples, nor is is clear what the appropriate patterns of use really are..

    Possible example: I'm inside an iterator doing a number of yield return operations, etc, to Choices, etc. and may want that iterator to have a way to handle any unhandled exceptions during those operations, so I add a CausalityContext within that iterator.

    Then perhaps one of the services that I do a yield return call to (it returning a Choice) decides that it needs a 'nested' causality, so it creates one for it's exception handling (of unhandled exceptions in the dispatcher chain).. Once that service posts back thru the Choice, it appears that the original Iterator has it's restored CausalityContext in place again.

    Any insight on the appropriate usage patterns would really be appreciated. Also the intent of the CoordinationPort?

    The CCR appears to be an extremely useful library for a vast number off cases and I really appreciate your time in filling in the gaps!

    tnx again!
    Don


    Hi Don, we actually use causalities, for exactly the same reasons you guessed Smiley Its a generalization, across threads and processors, of the nested exception handling mechanism. Much more powerfull however since it can deal with joins (two different causalities coming together in a common handler). And no, they are not transactions Wink

    Causalities are used for every single operation in the DSS runtime, our services infrastructure. They make sure that when an exception is thrown, within any logically related descendant of a Post operation, that exception can be cleanly caught and dealt with in one place.

    The CoordinationPort allows anyone within a causality to post a message to the "owner" of the causlity, allowing you to implement all kinds of protocols (like N phase commit, etc)

    Please post any further questions in the robotics newsgroup since thats the one we acively monitor. Soon we will have a forum in MSDN where we can talk about CCR stuff exclusively (it will be under the Robotics family of forums).

    For examples, i have posted something in the past in our robotics newsgroup on this (search for Causalities, Causality)

    btw we always want to know how the CCR is used and applied, especially in large scale commercial applications.


  • Thanks George - I appreciate the quick reply.. I'll post any further questions on the newsgroup. It would in fact be quite helpful to have a subsection specific to CCR. The application I'm doing a 'proof-of-concept' for is within the context of asycnhronous web services (asp.net/wse3.0), as well as inwith a number of windows services and it looks very promising! Smiley

    Don
  • Hai ,

    This is sukumar . i am very new is this site. i want to implement CCR concept in my project . i  gone through this video file(http://channel9.msdn.com/ShowPost.aspx?PostID=143582) but i didnt get properly .if u have any document abt CCR plz send to my  inbox. its help for my team .

    my mail id : honeysukumar@yahoo.com


  • Everyone are welcome to try my commit to the community and play with my framework samples at http://plugins.codeplex.com

     

    Sincerely yours,
    hack2root

Remove this comment

Remove this thread

close

Comments Closed

Comments have been closed since this content was published more than 30 days ago, but if you'd like to continue the conversation, please create a new thread in our Forums,
or Contact Us and let us know.