An Asynchronous Messaging Library for C# 2.0
The
Concurrency and Coordination Runtime (CCR) is a lightweight port-based concurrency library for C# 2.0 developed by George Chrysanthakopoulos in the
Advanced Strategies group at Microsoft now productized and shipped as part of the Microsoft Robotics Studio runtime. Here are some of our design constraints for the CCR:
* We want to support coordination programming constructs as a library rather than by modifying an existing language or inventing a new language. Eventually the best way to add concurrency constructs is through language level support. We believe that it may too early now to freeze the coordination patterns(including joins, choice, etc) into a language because we do not yet have enough experience of their use. For the purpose of experimentation a library based approach seems more suitable. The arbiter architecture described below also allows us to experiment with new constructs in an extensible, composable way.
* We wish to support port-based programming where message passing is used not just at the interface of the program to communicate with external entities but as a software engineering mechanism to help structure our programs. Our desire is to use message passing to help provide isolation between different components for reliability and parallelism.
* We wish to support scenarios in which there is a very large amount of message-passing (e.g. service oriented programming and web servers). Constraining ourselves to mainstream operating systems, this means that we can not directly use operating system threads to help block on reading values from ports. We propose an alternative work-item scheme which does not use operating system threads which works by implementing a continuation passing scheme for port-based programming. We show how language features in C# can help the programming avoid having to explicitly write continuation passing style code.
Other documentation sources
An MSDN article on the implementation and design of the CCR, with code samples is available at:
http://msdn.microsoft.com/msdnmag/issues/06/09/ConcurrentAffairs/default.aspxThe Microsoft Robotics Studio product uses the CCR and its service tutorials are a good source of documentation for using the CCR in a service orientated architecture:
http://www.microsoft.com/robotics
Ports, Arbiters and Receivers
The CCR library defines a port class which is generically parameterized on the types of values that it may carry. For example:
Port<float> pF = new Port<float>() ;
defines a port
pF which can only carry values of type float. The
Port type is generically instantiated by giving a type name in angle brackets. One can declare a port that can carry different kinds of values:
[PortSet<int,string>] pIS = new [PortSet<int,string>();]
Here the port
pIS can carry types of int or string. One can instantiate a port with up to 16 different types. A port that carries multiple values works by maintaining unrelated sub-ports for each type. Reading an int value from
pIS works by reading from the sub-port that contains ints and this read operation is unaffected by the state of the string sub-port.
Ports comprise a FIFO data-structure to hold the values in a port and a list of continuations which execute when a message of a particular type arrives. A continuation is represented by a C# delegate which identifies a method which can either be named or written directly using an anonymous method. A message is sent to a port by using a post method:
Port<int> pi = new Port<int>() ;
pi.Post (42) ;
We provide a method called test to atomically remove a value from a port (if the port is not empty). The test method returns a boolean result: false if the port was empty (a default value is returned) and true otherwise.
int iv ;
if (pi.Test (out iv))
[Console.WriteLine] ("Read " + iv) ;
else
[Console.WriteLine] ("Port empty.") ;
One typically does not use the test method directly. Instead, this low level function is used to define arbiters which implement concurrency constructs like join patterns and interleaved calculations.
We have developed several kinds of arbiters including:
* single item receivers for specifying handlers on a single port;
* a choice arbiter which will chose which of two receivers to fire based on the availability of messages;
* an interleave arbiter (explained later);
* a join arbiter for static join expressions;
*
MultipleItemReceive arbiters for specifying
dynamic joins.
Arbiters can all be created using a static class called Arbiter. The arbiters currently supported can be create through the following static methods:
Single Item receiver
Arbiter.Receive<T>(bool persist, Port<T> port, Handler<T> handler);
This arbiter identifies the code to execute when a value becomes available on a port. The code to be run is specified as a named delegate in C# or as an anonymous delegate. An example is shown below.
Activate ( Arbiter.Receive(false, p, delegate (int i) { [Console.WriteLine(i)] ; }) );
The Receive method associates a one time 'handler' function to call when a value of type int is available at port
p. The call to activate causes the listener to come to life by watching for int values on port
p and then when one arrives it will remove it from the port, bind the integer value to
i in the formal arguments of the delegate and execute the body of the delegate. In this case the handler code just writes out the value that was read from the port.
Join receiver with known number of ports
[Arbiter.JoinedReceive<T0,T1>(bool] persist, Port<T0> port0, Port<t1> port1,Handler<T0,T1> handler);
The join arbiter allows one to atomically consume messages from multiple ports. For example:
Activate( [Arbiter.JoinedReceive<int,string>(true,] p1,p2, [IntAndStringJoinedHandler)] );
The above will schedule the execution of the
IntAndStringJoinedHandler when values can be atomically read on ports p1 and p2 and will continue doing so as long as messages are available (its a persisted receiver). The handler will be passed the values that were read. One can join over several ports of different types.
As usual the handler can be specified with an anonymous method which makes it clearer what happens when the join pattern fires:
Arbiter.Activate(taskQueue,
[Arbiter.JoinedReceive<int,int>(true,] balance, deposit,
delegate(int b, int d){ balance.post(b + d); })
);
Optimistic and two phase joins allow for concurrent progress for other overlapping joins, roll backs and retries.
Choice
[Arbiter.Choice(IReceiver] [] receivers);
[Arbiter.Choice<T0,T1>(IPort<T0,T1>] port, Handler<T0> handler0, Handler<T1> handler1);
The choice arbiter allows one to specify that one of two (or more) branches should fire and the others should be discarded. For example using the first method:
Arbiter.Activate(taskQueue,
Arbiter.Choice(
[Arbiter.Receive(false,p,MyIntHandler),]
[Arbiter.Receive(false,p,MyStringHandler)]
)
);
will run either
MyIntHandler or
MyStringHandler but not both. The choice will be determined by the availability of int or string message on the
p port and if both types of messages are available then a non-deterministic choice is made. Notice that the
persist parameter in Arbiter.Receive must be false, for all arbiters under Choice.
Interleave
Arbiter.Interleave(
[TeardownReceiverGroup] teardownGroup,
[ExclusiveReceiverGroup] exclusiveGroup,
[ConcurrentReceiverGroup] concurrentGroup
);
The interleave arbiter allows one to express concurrent calculations which could have been described using join patterns but our experience has found that the join encodings were error prone, so a special arbiter was created for capturing a pattern that corresponds to multiple reads but single writers to a shared state. Its a more high level, far more concise way to capture advanced protection schemes.
The interleave arbiter is shown at work in the code below where a writer (
UpdateHandler) is specified as 'exclusive' and the state readers (
GetState,
Query) are specified as 'concurrent'. The one time shutdown handler (
ShutdownHandler) will cause the entire interleave to stop guaranteeing no delegates are running or will ever run again. Makes clean up easy!
The interleave ensures that these operations are scheduled safely and concurrently. We found interleave considerable less error prone than using joins to guard for state plus its more concurrent since its equivalent to a non-blocking, writer-biased reader/writer lock.
class [SampleService]
{
Dispatcher dispatcher = new Dispatcher(0,"Sample Service");
[DispatcherQueue] taskQueue = new [DispatcherQueue();]
Port<string> pState = new Port<string>();
[ServicePort] pMain = new [ServicePort();]
[ServicePort] [InitStatefull()]
{
[dispatcher.AddQueue(taskQueue);]
pState.Post("Hello"); // pState not used anywhere in Arbiters?
Arbiter.Activate(taskQueue,
Arbiter.Interleave(
new [TeardownReceiverGroup(]
Arbiter.Receive(false,pMain,Shutdown)
),
new [ExclusiveReceiverGroup(]
Arbiter.Receive(true,pMain,Update)
)
new [ConcurrentReceiverGroup(]
[Arbiter.Receive(true,pMain,GetState)]
)
)
);
return pMain;
}
}
The continuations under an interleave arbiter may be iterators. The interleave arbiter will provide atomicity guarantees until the sequential state machine has completed. The implementation uses interlocked routines and ports to queue readers and writers and once again there is no thread blocking. This idiom has found widespread use in our own concurrent programs.
Multiple Item receiver (dynamic join)
Arbiter.MultipleItemReceive<T0>(bool persist, Port<T0> [] ports,
VariableArgumentHandler<T0> handler);
Arbiter.MultipleItemReceive<T0>(bool persist, Port<T0> port, int messageCount,
VariableArgumentHandler<T0> handler);
We now present an example of a dynamic join operation which is not easily expressed using C-Omega style static join declarations. The code fragment below takes a list of files. The objective is to perform a scatter/gather style operation to find a given pattern in each file. The foreach loop posts messages which trigger concurrent work items which search for patterns in files. This corresponds to the scattering phase. The
MultipleItemReceive method will create a join over a number of ports which is dynamically specified by the second argument (
filelist.Length). This corresponds to the gathering stage.
if (filelist.Length > 0)
{
[Port<msgFileSearchResult>] [FileResult] = new [Port<msgFileSearchResult>();]
// Scattter.
foreach (string f in filelist)
{
[msgTextFileSearch] msg = new [msgTextFileSearch(f,] msgSearch.Pattern, [FileResult);]
[SearchAgent.Post(msg);]
}
// Gather.
Arbiter.Activate(taskQueue,
[Arbiter.MultipleItemReceive(false,FileResult,] filelist.Length,
delegate(msgFileSearchResult[] complete)
{
[msgFileSearchResult] sum = [msgFileSearchResult.Accumulate(complete);]
[sum.DirectoriesSearched] = 1;
// post result to parent directory search
[ParentResult.post(sum);]
}));
}
Using iterators to avoid explicit CPS conversions
Our library expresses concurrent programs in explicit continuation passing style which is often quite an arduous way for a human to express concurrent calculations.
We can exploit iterators in the C# 2.0 language to help write code which appears closer to what one would write with blocking receives using threads but which is in effect CPS-transformed by the C# compiler.
Inheriting from the
IEnumerator interface allows one to specify "yield" points in a method at which control flow returns to the caller. When a
MoveNext() method is called on such a method control flow resumes from the yield point: not from the start of the method.
The coordination primitives in CCR all inherit from the
ITask interface and this is used to help identify the iteration variable for use with the
IEnumerator interface.
We illustrate the use of yield in the code below which defines a method which will execute a choice expression a given number of times (specified by num). Each time it will try to execute a choice which either tries to read a float from a port (
_pTest) or it tries to fire a join pattern that joins on an int and string value (on the same port). The result of either choice is captured by the Result string variable which is written out after each invocation of the choice expression. The
spawniterator method takes a method written using this yield approach and 'drives' the iterations.
class [MyClass]
{
Dispatcher dispatcher = new Dispatcher(0,"Sample Service");
[DispatcherQueue] taskQueue = new [DispatcherQueue();]
[Port<DoWork>] pMain = new [Port<DoWork>();]
public static void Main()
{
new [MyClass().Start();]
}
public void Start()
{
// schedule a task that will iterate over some message coordination
taskQueue.Enqueue(new [IteratorTask(num,] [IteratorOnChoiceAndJoin));]
// you can also associate a [ReIssue] receiver with a port and an iterator Handler
[Arbiter.Activate(taskQueue,Arbiter.ReceiveWithIterator(false,pMain,DoWorkHandler));]
}
// Example of concisely capturing data dependencies between asynchronous
// services
[IEnumerator<ITask>] [DoWorkHandler(DoWork] w)
{
[DoWork] workA = new [DoWork();]
Result r = null;
// send message to ServiceA
pServiceA.Post(workA);
// yield for result on response port attached to outbound message
yield return Arbiter.Choice(workA.pResponse,
delegate (Result Res) { r = Res;})
delegate (Failure F) { Trace.WriteLine("Failed");});
// when either branch above happens, we can will continue executing
// here without ever having blocked a thread!
if (r == null)
yield break; // error occured, break iteration
// send next message, depending on the result from A
[DoWork] workB = new [DoWork(r);]
pServiceB.Post(workB);
// handle result form B etc.
}
}
Note that the
Result stack variable is captured appropriately and it is set inside the continuations yet it can be accessed correctly from a context outside of the continuation.
Although this is not as direct as writing blocking receives it is a significant improvement over writing explicit continuation passing style code and we get the advantage of being able to express concurrent programs without using expensive operation system threads to represent blocking behavior.
Predicates
The CCR allows predicate (filtering/matching) logic to be trivially incorporated as part of the receiver logic. This enables you to do very powerfull, value driven, shredding of message streams and it follows the same rules in terms of composing and coordinating ports as outlined above. The predicate delegate is defined as follows:
public delegate bool Predicate<T>(T value);
The port API used to attach predicates is the same method used to attach single item receivers:
SingleItemReceiver With(Predicate<T> p, Handler<T> H);
Below is an example using a predicate to filter messages based on their values:
Port<int> Port = new Port<int>();
Arbiter.Activate(taskQueue,
[Arbiter.Receive(true,Port,MyLargeIntegerFilter,HandlerThatRunsOnlyWithLargeInts),]
[Arbiter.Receive(true,Port,MySmallIntegerFilter,HandlerThatRunsOnlyWithSmallInts),]
[Arbiter.Receive(true,Port,MyCatchAllIntHandler)]
);
The filters above get evaluated for each message posted on the port. If the predicate returns true, the receiver logic runs and the code gets activated (
HandlerThatRunsOnlyWithLargeInts for example, in the first branch above). If it returns false, its like the predicate never run and the message goes back in the port or supplied to the next receiver.
An example of a predicate:
void
MyLargeIntegerFilter(int i)
{
if (i> 1000000)
return true;
else
return false;
}
You can use predicates and the CCR to concurrently filter email for spam, validate messages based on values, dispatch based on ranges etc.