'SQL Everywhere Edition' - What. How. Why.

staceyw wrote:Nice. I Always look forward to George and more CCR. I am still trying to wrap my head around how this can help with the big async need - Async Sockets! Sounds like we still need s.BeginReceive() and handle locking on shared state object parameters (i.e. bytes received) and kick off another read if we did not read enouph bytes so we can build the whole packet to parse it. Then start again for the next client request, etc. Using example like WebRequest or reading a whole file is kinda cheating because it does all that stuff for you behind the covers. How might async sockets work with the CCR in a general code flow?
What I would really like to see at some point is a way to Read the int length (i.e. 4 byte prepended length), then kick off another read to read len bytes async and don't notify me again until len bytes read or an exception. Then my delegate only needs to deal with whole packets and I can parse, process, write reply, and post another read.
BTW, the thing with the WaitForEnter() is common to what we have had to do with threaded console apps already. So not new to the CCR, but I got your point. Thanks guys and great work!!
--
wjs
Andrew Davey wrote:Do all the nice operator overloads still exist in the CCR?
Whilst having a library that is consistent with the BCL is very important, the operator overloads really let you concentrate on only your code. Reducing code noise is an important goal IMHO. Since C# has no syntactic macro capabilities, operator overloads are the only way to abstract away the boiler plate code. (Being a purely additive feature means no has to use them if they like typing lots.)
If I get time I will look at making some syntactic macros for Boo or Nemerle, to make coding against the CCR look awesome
Also, make the CCR available as a separate download - this will certainly increase uptake by the community. (Dare I say make the project shared-source? Seeing the code would hopefully make it easier for people to write their own extensions.)
Keep up the great work!
Judah wrote:I would concur with the other posters, you guys should release this as a seperate product from the Robotics download. Just put it up as a download by itself; it doesn't need its own MSDN developer center or anything so elaborate.
Another thing I'd like to request is XML documentation on the code. I really rely on this sort of thing while coding for quick information on parameters, methods, etc. It would be a really nice thing to have.
I'm really excited about this library. My jaw dropped a little seeing the build times decrease by nearly the number of processors in the system. So many of us developers have at least hyper-threaded processors, if not physical dual core machines. The real application seems obvious there.
I am also really excited about the lockless way of sharing state by using that particular arbiter task...I think Interleave was the name. That sounds really sweet -- do you know how much we devs toil over using locks just to get the shared state right? Man, if I could move some of my code to use these interleaves rather than locks between methods, holy cow I could save so much time and probably improve performance as well.
George, this library looks great and the demos really help out. I wondering if there are any demos that use Windows Forms? As you know, UI code combined with concurrency gets hairy since we can touch the UI only from a single thread. Is this something the CCR can still work with? I see a WinForms adapter bundled inside the Robotics IDE...maybe you could elaborate a little?
georgioc wrote:
staceyw wrote: Nice. I Always look forward to George and more CCR. I am still trying to wrap my head around how this can help with the big async need - Async Sockets! Sounds like we still need s.BeginReceive() and handle locking on shared state object parameters (i.e. bytes received) and kick off another read if we did not read enouph bytes so we can build the whole packet to parse it. Then start again for the next client request, etc. Using example like WebRequest or reading a whole file is kinda cheating because it does all that stuff for you behind the covers. How might async sockets work with the CCR in a general code flow?
What I would really like to see at some point is a way to Read the int length (i.e. 4 byte prepended length), then kick off another read to read len bytes async and don't notify me again until len bytes read or an exception. Then my delegate only needs to deal with whole packets and I can parse, process, write reply, and post another read.
BTW, the thing with the WaitForEnter() is common to what we have had to do with threaded console apps already. So not new to the CCR, but I got your point. Thanks guys and great work!!
--
wjs
Hi Stacey, did you take chance to play with the asynchronous web and file i/o examples Jeff has in the MSDN article? Its exactly what you want to do... As for atomically updating state, you can use our interleave pattern to protect concurrent access to state, without using locks. Just essentially annotate some piece of code as exclusive, attach that method to a port, and then issue messages when you want to do atomic updates.
The robotics service tutorials talk about this as well
So I rolled up the sleeves and dug in. This basically spins up a listener on another thread. The reads are done using pure async until a whole message is read, then the result is posted to the port. Then another read is started, etc. Also, client timeout is handled with a timer port. This seems to work well, however have not did major testing. Does this pattern seem reasonable or would you refactor this different?
Code:
// Act as both socket client and server. Server is listens on another thread.
// Note: Make sure to turn your local firewall off or open the correct port.
private static void SocketReadDemo(DispatcherQueue dq)
{
Console.WriteLine();
Console.WriteLine("SocketReadDemo:");
// Start server listening on another thread.
Thread t = new Thread(SocketServer);
t.IsBackground = true;
t.Start(dq);
Thread.Sleep(500); // Wait for server to spin up.
// Connect to server and send a message.
TcpClient c = new TcpClient();
c.Connect(IPAddress.Loopback, 9000);
for ( int i=0; i<3; i++ )
{
byte[] buf = Encoding.ASCII.GetBytes("Message" + i);
byte[] len = BitConverter.GetBytes(buf.Length);
c.GetStream().Write(len, 0, len.Length); //Prepend len bytes.
c.GetStream().Write(buf, 0, buf.Length); //Write data.
}
// Close socket.
Thread.Sleep(5000); // Force a timeout on server side, so we can see the timeout exception.
c.Close();
HitEnter();
}
private static void SocketServer(object value)
{
DispatcherQueue dq = (DispatcherQueue)value;
TcpListener l = new TcpListener(9000);
l.Start();
while ( true )
{
Console.WriteLine("Server waiting for client...");
Socket s = l.AcceptSocket();
Console.WriteLine("Client connected.");
HandleClient(dq, s);
}
}
private static void HandleClient(DispatcherQueue dq, Socket s)
{
Port<byte[]> bytesPort = new Port<byte[]>();
Port<Exception> failurePort = new Port<Exception>();
Port<DateTime> timeoutPort = new Port<DateTime>();
dq.EnqueueTimer(TimeSpan.FromMilliseconds(3000), timeoutPort);
// Kick off async read of a whole client message.
GetMessage(s, bytesPort, failurePort);
// Create Arbiter that will handle client read result.
Arbiter.Activate(dq,
Arbiter.Choice(
Arbiter.Receive(false, bytesPort,
delegate(byte[] buf)
{
// Do something with the client message.
Msg("Message read, bytes={0}, data: {1}", buf.Length, Encoding.ASCII.GetString(buf));
// Kick off another read.
Console.WriteLine("Get another client message.");
HandleClient(dq, s);
}),
// Upon timeout, stop all processing.
Arbiter.Receive(false, timeoutPort,
delegate(DateTime dt)
{
Msg("Client request did not completed within timeout.");
if ( s != null )
s.Close();
}),
Arbiter.Receive(false, failurePort,
delegate(Exception e)
{
Msg("Socket read failed, error={0}", e.Message);
if ( s != null )
{
s.Shutdown(SocketShutdown.Both);
s.Close();
}
})));
// Return without blocking.
}
// This will read a single message from a client socket async (i.e. is not a blocking call).
// Either the whole message will be posted to the port, or an exception will be posted.
// Note: Because we are using async reads, we don't have option to timeout. However, timeout is
// handled by a DQ timer in the control block - nice! This saves having to do a kill loop on another thread.
public static void GetMessage(Socket s, Port<byte[]> portBytes, Port<Exception> portException)
{
// Remember, which sockets, you are only gaureenteed to read 1 byte each read (or error), so
// you need to handle with recursion.
if ( s == null )
throw new ArgumentNullException("Socket");
int bytesRead = 0;
int bytesToRead = 4;
bool gettingData = false;
byte[] buffer = new byte[4];
AsyncCallback cb = null;
cb = delegate(IAsyncResult ar)
{
try
{
int read = s.EndReceive(ar);
if ( read == 0 )
{
portException.Post(new Exception("Socket closed."));
return;
}
Console.WriteLine("GetMessage read {0} bytes.", read);
bytesToRead -= read;
if ( bytesToRead == 0 )
{
if ( gettingData )
{
// Send data buf to port.
portBytes.Post(buffer);
}
else
{
// Got len, so getting data.
bytesToRead = BitConverter.ToInt32(buffer, 0);
buffer = new byte[bytesToRead];
bytesRead = 0;
gettingData = true;
// Kick off first read of the actual message bytes.
s.BeginReceive(buffer, bytesRead, bytesToRead - bytesRead, SocketFlags.None, cb, null);
}
}
else // Else, more data, so kick off another read.
s.BeginReceive(buffer, bytesRead, bytesToRead - bytesRead, SocketFlags.None, cb, null);
}
catch ( Exception e )
{
portException.Post(e);
}
};
try
{
// Kick off first read of len bytes.
s.BeginReceive(buffer, bytesRead, bytesToRead - bytesRead, SocketFlags.None, cb, null);
}
catch ( Exception e )
{
portException.Post(e);
}
}
schrepfler wrote:Althought I don't pretend that I understand this library I somehow feel a event-like syntax would be perhaps more intuitive. For example we could use the += instead of Arbiter.Choice (or would that be counter-intuitive or would complicate the c# syntax?)
georgioc wrote:
Activate(Arbiter.Interleave(
new TeardownReceiverGroup(
Arbiter.Receive<Shutdown>
(_mainPort,false,ShutdownHandler)),
new ConcurrentReceiverGroup(
Arbiter.Receive<ReadPacket>
(true,mainPort,ReadHandler),
Arbiter.Receive<WritePacket>
(true,mainPort,WriteHandler))
new ExclusiveReceiverGroup()
));
}
// make this an iterator if you want to do multiple async reads
// in sequence
private void ReadHandler(ReadPacket packet)
{
// put your code that does the socket i/o here
}
}
schrepfler wrote:Thanks, it makes much more sense now. I'll be definitely check your library out and I hope it'll be moved to the BCL soon just like java decided to integrate concurrent package in their base libs under java.util.concurrent. If you are aware of it, can you comment of differences between your aproach and theirs?
I must say that the two demos that have been shown on C9 were quite a thrill but my objection is that the model you've managed to pull off is so radical that I'm having problems digesting it. When I look a demo on java.util.concurrent it makes sense as the higher level concepts are the same tackled for years, while CCR tackles stuff in a seemingly different way so I can't do a apples to apples comparison (that said, it's also the most interesting way of using iterators I've seen yet, .net 2.0 rocks).
Can you for instance do some demo code where you present traditional concept like the Future pattern or some comparison between java.util.concurrent and CCR (two colums, one java solution, other CCR).
Also they introduced some new collection types with finer lock granularity, will CCR or BCL try to tackle that as well in the future?
staceyw wrote:
georgioc wrote:
Activate(Arbiter.Interleave(
new TeardownReceiverGroup(
Arbiter.Receive<Shutdown>
(_mainPort,false,ShutdownHandler)),
new ConcurrentReceiverGroup(
Arbiter.Receive<ReadPacket>
(true,mainPort,ReadHandler),
Arbiter.Receive<WritePacket>
(true,mainPort,WriteHandler))
new ExclusiveReceiverGroup()
));
}
// make this an iterator if you want to do multiple async reads
// in sequence
private void ReadHandler(ReadPacket packet)
{
// put your code that does the socket i/o here
}
}
Thanks George. I like that idea. Making a socket just a Port abstraction is cool. Working on it now. Couple questions:
1) I don't fully follow the Interleave arbiter above and why it was picked over a Choice arbiter.
2) I wonder if overlapped reads and writes will be a concern. I assume it will still be possible to get two reads quickly on a port and kick off two async reads and then they get mixed up data? Maybe writes with same issue. Not sure.
3) What happens if you get a 2nd Shutdown message on a port. Does it just sit in the port? Can you close a port so the caller will get exception when they try to Post?
4) With timers. Say you get a timer event because of timeout. So you close the socket, etc. But at the ~same time, the read actually completed, but the timer beat it to the Q. What happens to that Read post? What happens to closing a socket in the process of a read? Where will that exception even be seen on the port?
Thanks again.
--
wjs
georgioc wrote:
staceyw wrote:
georgioc wrote:
Activate(Arbiter.Interleave(
new TeardownReceiverGroup(
Arbiter.Receive<Shutdown>
(_mainPort,false,ShutdownHandler)),
new ConcurrentReceiverGroup(
Arbiter.Receive<ReadPacket>
(true,mainPort,ReadHandler),
Arbiter.Receive<WritePacket>
(true,mainPort,WriteHandler))
new ExclusiveReceiverGroup()
));
}
// make this an iterator if you want to do multiple async reads
// in sequence
private void ReadHandler(ReadPacket packet)
{
// put your code that does the socket i/o here
}
}
Thanks George. I like that idea. Making a socket just a Port abstraction is cool. Working on it now. Couple questions:
1) I don't fully follow the Interleave arbiter above and why it was picked over a Choice arbiter.
2) I wonder if overlapped reads and writes will be a concern. I assume it will still be possible to get two reads quickly on a port and kick off two async reads and then they get mixed up data? Maybe writes with same issue. Not sure.
3) What happens if you get a 2nd Shutdown message on a port. Does it just sit in the port? Can you close a port so the caller will get exception when they try to Post?
4) With timers. Say you get a timer event because of timeout. So you close the socket, etc. But at the ~same time, the read actually completed, but the timer beat it to the Q. What happens to that Read post? What happens to closing a socket in the process of a read? Where will that exception even be seen on the port?
Thanks again.
--
wjs
2) Absolutely!!!!! Overlapped reads and writes is what we live for! As they complete, you post a response on the read/write packet you got, independently of all other requests. If you want to use
interleave to protect you while requests are pending, you can use an iterator and yield until the request is done. But this a bit of an advanced subject, maye i ll get Jeff to write another article on iterators and interleave, etc
...
staceyw wrote:
Thanks George. Maybe the answer is in that, but still having a little confusion. Say a Read gets posted, so I start an async read. So the first async reads maybe 5 bytes and not the whole message yet. However, another Read gets posted and that async read loop starts as well. The second read starts reading the packet that should only be read by the first reader. So data gets gumbled - error. What I think I need is a way to only pop the second read only *after the first read has completed fully and has posted a reply. You could do this via calling convension and say don't post another read until you get a reply, but then it is only convention and the service should really protect against this. I could probably work with a flag or something, but I was hoping for a better idea using the CCR. Does this make sense?
georgioc wrote:
...
IEnumerator<ITask> ReadHandler(ReadRequest read)
{
byte [] readBuffer = new byte[read.ReadSize];
Port<int,Exception> readResult = new Port<int,Exception>();
int bytesRead = 0;
while (bytesRead < read.ReadSize)
{
// issue async read for N bytes. In the method below
// use BeginRead, etc and post to the private port when you
// read bytes, even less than total.
DoAsyncRead(readResult, bytesRead, readBuffer);
yield return Arbiter.Choice(readResult,
delegate (int bytes) { bytesRead += bytes;},
delegate (Exception ex)
{
bytesRead = 0;
read.ResponsePort.Post(ex);
}
);
if (bytesRead == 0)
yield break; //error occured, we already posted exception
} //while loop. Look we can do loops and async requests!!!
// ok io operation done.
read.ResponsePort.Post(readBuffer);
}
...
staceyw wrote:
georgioc wrote:
...
IEnumerator<ITask> ReadHandler(ReadRequest read)
{
byte [] readBuffer = new byte[read.ReadSize];
Port<int,Exception> readResult = new Port<int,Exception>();
int bytesRead = 0;
while (bytesRead < read.ReadSize)
{
// issue async read for N bytes. In the method below
// use BeginRead, etc and post to the private port when you
// read bytes, even less than total.
DoAsyncRead(readResult, bytesRead, readBuffer);
yield return Arbiter.Choice(readResult,
delegate (int bytes) { bytesRead += bytes;},
delegate (Exception ex)
{
bytesRead = 0;
read.ResponsePort.Post(ex);
}
);
if (bytesRead == 0)
yield break; //error occured, we already posted exception
} //while loop. Look we can do loops and async requests!!!
// ok io operation done.
read.ResponsePort.Post(readBuffer);
}
...
Wow. Thanks man. That is the shiz nit. I almost get what is happening, but not fully. Could you, please, walk me through that in terms of the yield loop. What thread is calling and consuming the ITask, what happens after yield return, etc. Thanks so much G. You have been so helpful here. Cheers!
--
wjs
schrepfler wrote:Would it be difficult to create a sequence diagram for this flow (for those of us that think visually)?
georgioc wrote:
schrepfler wrote: Would it be difficult to create a sequence diagram for this flow (for those of us that think visually)?
the lines of code are few, but unfamiliar so i will repeat the original request stanley had (which is a bit of an advanced topic):
"Create an asynchronous packet processor reading from a stream based transport, where a single read is not guaranteed to return all the bytes for one packet"
"Make a read atomic, until its done reading an entire packet. Prevent other reads from accessing the transport (and possibly other writes)"
I cant easily attach a block diagram but the 10 lines of CCR code in the post above sow how you can use the familiar synchronous read style to read repeatedly from the transport, until a packets worth of bytes has been read.
At the same time, CCR protects you, without the need for explicit locking or thread blocking, from other reads.
staceyw wrote:
That would be a great add to C9 - allow attachments for PPT, gif, *.cs, projects, etc. Anyway, related question...
Is it possible to have atomic reads as a 1 set and atomic writes as 1 set. So a read and write could process in parallel, but only 1 read at a time and 1 write at a time. I have read somewhere that it is safe to process a read on one thread and a write on another. Thanks again!
-- wjs
georgioc wrote:
staceyw wrote:
That would be a great add to C9 - allow attachments for PPT, gif, *.cs, projects, etc. Anyway, related question...
Is it possible to have atomic reads as a 1 set and atomic writes as 1 set. So a read and write could process in parallel, but only 1 read at a time and 1 write at a time. I have read somewhere that it is safe to process a read on one thread and a write on another. Thanks again!
-- wjs
yes, absolutely. Use two interleaves decoupled from each other. Put the ReadHandler in an ExclusiveReceiverGroup in one interleave and the WriteHandler also in an exclusive group in another interleave.
activate them independently but still use the same shutdown port between them. From each shutdown handler post a shutdown message so the second interleave tears down as well (although only guaranteed atomic with its exclusive group)
another way to do this ofcourse is with the one time activation of the read/write handler and then re-activating the receiver when you are done with the read/write.
g
staceyw wrote:
Thanks g! I wonder if it would make sense (eventually) to extend your CCR Interleave code to allow multiple ExclusiveReceiverGroups. So I could do something like this:
Arbiter.Activate(dq,
Arbiter.Interleave
(
new TeardownReceiverGroup
(
Arbiter.Receive<Shutdown>(false, replyPort, ShutdownHandler)
),
new ExclusiveReceiverGroup
(
Arbiter.Receive<ReadPacket>(true, replyPort, ReadHandler),
),
new ExclusiveReceiverGroup
(
Arbiter.Receive<WritePacket>(true, replyPort, WriteHandler),
)
new ConcurrentReceiverGroup
(
Arbiter.Receive<Timeout>(true, replyPort, TimeoutHandler),
Arbiter.Receive<Error>(true, replyPort, ErrorHandler),
)
)
);
Not sure if this would be possible or not, but would seem to be handy for things like this while needing only one Arbiter active.
Also, I am wondering if it is better to have one reply port per socket (in this case) or have 1 reply port that handles all sockets, then just figure out what socket replied based on some data in the reply object. Just wonder the perf and overhead between the two methods. Thanks again. I am starting to feel the Force with the CCR
--wjs
georgioc wrote:
staceyw wrote:
Thanks g! I wonder if it would make sense (eventually) to extend your CCR Interleave code to allow multiple ExclusiveReceiverGroups. So I could do something like this:
Arbiter.Activate(dq,
Arbiter.Interleave
(
new TeardownReceiverGroup
(
Arbiter.Receive<Shutdown>(false, replyPort, ShutdownHandler)
),
new ExclusiveReceiverGroup
(
Arbiter.Receive<ReadPacket>(true, replyPort, ReadHandler),
),
new ExclusiveReceiverGroup
(
Arbiter.Receive<WritePacket>(true, replyPort, WriteHandler),
)
new ConcurrentReceiverGroup
(
Arbiter.Receive<Timeout>(true, replyPort, TimeoutHandler),
Arbiter.Receive<Error>(true, replyPort, ErrorHandler),
)
)
);
Not sure if this would be possible or not, but would seem to be handy for things like this while needing only one Arbiter active.
Also, I am wondering if it is better to have one reply port per socket (in this case) or have 1 reply port that handles all sockets, then just figure out what socket replied based on some data in the reply object. Just wonder the perf and overhead between the two methods. Thanks again. I am starting to feel the Force with the CCR
--wjs
I would recommend one port per socket, and further more a unique port per read/write packet. Let the CCR do the dispatching for you, no need for double dispatch. The ports are fairly lightweight so they should not be a concern in terms of cost, all other things considered.
We do this in our runtime and achieve good message throughputs and stable, small, working set
staceyw wrote:
Sorry for another question, but that just confused me again. If I create a new port for each read, for example, then does that mean I need to Activate a new Arbiter for each new port as well? If so, your saying creating a new Arbiter is not expensive either? Also, does this mean I will not be able to use an Interleave arbiter to gain the protection of atomic reads (i.e. only 1 read at a time will pop)? Thanks again George.
tartufella wrote:Hi George,
This library looks fantastic if a little dautning to get your head around. You must be a very satisfied developer indeed)
That darned Ritcher just released his PowerThreading Library with lots of Async goodness that I did'nt quite get how to use and now there's CCR!!
Anyway, I kind of get it but I just wanted to check one thing regarding the persitent nature of Queues. If I post a number of Items to a port and then Activate it on a Receive handler without persitency, am I right in saying that only one item will be processed? I was thinking that perhaps it would process all items in the port already but just ignore the ones addedd after the Arbiter was started.
Many thanks, keep the example code coming if you don't mind and full marks to Stacey and crew for picking this stuff up so quick.
PS. I'd love to see a full working project of reading an arbitary amount of bytes from an async socket.
Many thanks
Drexthepimp wrote:George, I love your enthusiasm, and i think you are really onto something with the CCR. Here is my question though: How do we interact with databases, which are inherently blackbox affairs? Consider this scenario
Function does some calculation, adds a result to the database using a transaction based stored procedure. If we leave the stored procedure working away, and then later on we encounter a rollback, how do we respond to that rollback?
It would seem to me that we have progressed too far in our application. Either we wait to be sure, or we risk being too good at the CCR stage that we force a transaction rollback/Data inconsistency Database Trigger. ie we run 30 threads against the DB, threads 1 and 27 cause a rollback, or are not consistent.
This is just my pet hate at the moment, I daily work with an app that does the following:
func addtoDB(params){ //pseudo code
addToDB //do SQL one assumes
displayStatusDialogue
}
Now to do multiple dimensional queries, they call the method 30 times, resulting in a dirty 30 dialogues flashing. Admittedly this is rubbish engineering, but it was what got me thinking about the problems of repeatedly hitting the database. Inherently this function assumes success, which is why it frequently crashes [I didn't write it, i just have to use it!]
the idea of ATOMIC database calls seems to be great but closes out the ability to interact with the CCR? Ie the CCR is not allowed to jump in and help the database.
Am I wrong here? I am making wild generalisations about databases, and blind calls to functions, but i see a problem.
It would be interesting to get your feedback, it is great that you are engaging with channel9!
Drex
Drexthepimp wrote:So use it wisely, at messaging points, possibly prevent then commands flowing to the db that you can predict would cause errors. The ability to close down ports when failures occur is like a firewall to the database I suppose.
I am playing around with this whole package. It is a very good idea. I suppose i now wonder two questions:
1) How much of this idea was inspired from you working on complicated problems down in the kernel.
2) How much significance could this have implemented down in the kernel, or is it effectively already a tackled problem?
Cheers
Drex
Hypothetically, why does the CCR have to wait for say your 1million messages to be coordinated before being able to service for example this 1 other important message you want to process (which is not part of the coordination of the 1million messages but
ends up having to wait until those 1million messages get coordinated; or does it wait?)?
I might have missed this in the interview but does the CCR allow messages to get thrown into the mix before the 1million messages get coordinated that happen to not be part of that coordinated set so to speak. Maybe what I asking is whether or not the scheduling
algorithm accomodates this in the interleaving pattern or does the CCR address this in some other way.
Nice idea by the way, I hope to see more of it. Also, IMHO I wish the threading model would get replaced with something that allowed for better determinacy.
staceyw wrote:Hi George. Is there an example of using InterleaveReceiverGroup?
tartufella wrote:Hi George,
I think I'm getting the whole CCR thing bit by bit.
What would nail it for me is if I could do the following.
I have a logging application which must send emails either when a certain number have amounted - or a certain time period occurs. So if I have 5 log entries OR 5 minutes occur, then I need to send everything in the port.
This leads obviously to a Choice Arbiter with a Timer and a MultipleItemReceive. However, since a choice cannot be persisted, how could I keep this chain of events going after either event has occured for the first time?
thanks
Marshall
mml wrote:regarding interleaving...Hypothetically, why does the CCR have to wait for say your 1million messages to be coordinated before being able to service for example this 1 other important message you want to process (which is not part of the coordination of the 1million messages but ends up having to wait until those 1million messages get coordinated; or does it wait?)?
I might have missed this in the interview but does the CCR allow messages to get thrown into the mix before the 1million messages get coordinated that happen to not be part of that coordinated set so to speak. Maybe what I asking is whether or not the scheduling algorithm accomodates this in the interleaving pattern or does the CCR address this in some other way.
Nice idea by the way, I hope to see more of it. Also, IMHO I wish the threading model would get replaced with something that allowed for better determinacy.
Hey Georgio,
Perhaps you'll call it abuse of the CCR 8-), but I wanted to use it as an asynchronous implementation of the observer pattern, where different interested observers can register themselves, which all get notified when something happens. So my idea was
to register different TaskReceiver objects (as observers) to a Port where items are posted such that all TaskReceiver objects are executed (as concurrent as possible). The actual implementation of the CCR is that only one of them is.
A nice thing is that possible observers only would get notified if there are new items at multiple ports.
Perhaps an idea for future versions of the CCR: let the user choose if only one TaskReceiver gets the item (as implemented today) or all (unless of course the predicate indicates that the item is of no interest to the specific TaskReceiver object).
I don't know if you ever looked at the CCR as a way to implement an asynchronous version of the observer pattern and perhaps the impact of what I propose is too big.
Best regards,
Frans.
dukess wrote:Very Cool. What is the possiblity of getting the concurrent "build demo" sources? I know that I could definitely use that in my team.
fransv wrote:Hey Georgio,
Perhaps you'll call it abuse of the CCR, but I wanted to use it as an asynchronous implementation of the observer pattern, where different interested observers can register themselves, which all get notified when something happens. So my idea was to register different TaskReceiver objects (as observers) to a Port where items are posted such that all TaskReceiver objects are executed (as concurrent as possible). The actual implementation of the CCR is that only one of them is.
A nice thing is that possible observers only would get notified if there are new items at multiple ports.
Perhaps an idea for future versions of the CCR: let the user choose if only one TaskReceiver gets the item (as implemented today) or all (unless of course the predicate indicates that the item is of no interest to the specific TaskReceiver object).I don't know if you ever looked at the CCR as a way to implement an asynchronous version of the observer pattern and perhaps the impact of what I propose is too big.
Best regards,
Frans.
fransv wrote:Hey Georgio,
Perhaps you'll call it abuse of the CCR, but I wanted to use it as an asynchronous implementation of the observer pattern, where different interested observers can register themselves, which all get notified when something happens. So my idea was to register different TaskReceiver objects (as observers) to a Port where items are posted such that all TaskReceiver objects are executed (as concurrent as possible). The actual implementation of the CCR is that only one of them is.
A nice thing is that possible observers only would get notified if there are new items at multiple ports.
Perhaps an idea for future versions of the CCR: let the user choose if only one TaskReceiver gets the item (as implemented today) or all (unless of course the predicate indicates that the item is of no interest to the specific TaskReceiver object).I don't know if you ever looked at the CCR as a way to implement an asynchronous version of the observer pattern and perhaps the impact of what I propose is too big.
Best regards,
Frans.
public static void UsageDemo()
{
CcrEvent<int> myEvent = new CcrEvent<int>();
Port<int> p1 = new Port<int>();
Port<int> p2 = new Port<int>();
Port<int> p3 = new Port<int>();
myEvent.Subscribe(p1);
myEvent.Subscribe(p2);
myEvent.Subscribe(p3);
int sleepTime = 100;
Arbiter.Activate(new DispatcherQueue("q"),
Arbiter.Receive(true, p1,
delegate(int i)
{
Thread.Sleep(sleepTime);
Console.WriteLine("TID:{0} p1:{1}", Thread.CurrentThread.ManagedThreadId, i);
}),
Arbiter.Receive(true, p2,
delegate(int i)
{
Thread.Sleep(sleepTime);
Console.WriteLine("TID:{0} p2:{1}", Thread.CurrentThread.ManagedThreadId, i);
}),
Arbiter.Receive(true, p3,
delegate(int i)
{
Thread.Sleep(sleepTime);
Console.WriteLine("TID:{0} p3:{1}", Thread.CurrentThread.ManagedThreadId, i);
})
);
for ( int i = 0; i < 10; i++ )
{
myEvent.Publish(i);
}
}
}
georgioc wrote:
mml wrote: regarding interleaving... Hypothetically, why does the CCR have to wait for say your 1million messages to be coordinated before being able to service for example this 1 other important message you want to process (which is not part of the coordination of the 1million messages but ends up having to wait until those 1million messages get coordinated; or does it wait?)?
I might have missed this in the interview but does the CCR allow messages to get thrown into the mix before the 1million messages get coordinated that happen to not be part of that coordinated set so to speak. Maybe what I asking is whether or not the scheduling algorithm accomodates this in the interleaving pattern or does the CCR address this in some other way.
Nice idea by the way, I hope to see more of it. Also, IMHO I wish the threading model would get replaced with something that allowed for better determinacy.
Hi, the CCR doesnt have to waitAnd it doesnt. I actually round robin between dispatcher queues associated with the same dispatcher and i will process a message from each queue. So even if you have 1 million messages coming from one port, associate it with its own dispatcher queue, and then associate your other, critical port that receives only one message once in a while, with a different dispatcher queue!!!
This was a critical feature for real time, priority and alot of other applications, so the CCR was designed from the beginning to have a scheduling systems, that makes this work.
Jeff's examples show how to create dispatcher queues (one line) and then when you call Arbiter.Activate, pass the queue you want, with the appropriate receive operation.
staceyw wrote:
georgioc wrote:
mml wrote: regarding interleaving... Hypothetically, why does the CCR have to wait for say your 1million messages to be coordinated before being able to service for example this 1 other important message you want to process (which is not part of the coordination of the 1million messages but ends up having to wait until those 1million messages get coordinated; or does it wait?)?
I might have missed this in the interview but does the CCR allow messages to get thrown into the mix before the 1million messages get coordinated that happen to not be part of that coordinated set so to speak. Maybe what I asking is whether or not the scheduling algorithm accomodates this in the interleaving pattern or does the CCR address this in some other way.
Nice idea by the way, I hope to see more of it. Also, IMHO I wish the threading model would get replaced with something that allowed for better determinacy.
Hi, the CCR doesnt have to waitAnd it doesnt. I actually round robin between dispatcher queues associated with the same dispatcher and i will process a message from each queue. So even if you have 1 million messages coming from one port, associate it with its own dispatcher queue, and then associate your other, critical port that receives only one message once in a while, with a different dispatcher queue!!!
This was a critical feature for real time, priority and alot of other applications, so the CCR was designed from the beginning to have a scheduling systems, that makes this work.
Jeff's examples show how to create dispatcher queues (one line) and then when you call Arbiter.Activate, pass the queue you want, with the appropriate receive operation.
I have a question in regards to this. As the code below shows, Port processing does not seem to be fair. The first port posting a receiver seems to win the attention of the DQ until it is drained. Is this a "fair" model? tia
public static void PortTest()
{
Port<int> p1 = new Port<int>();
Port<int> p2 = new Port<int>();
for ( int i=0; i<10; i++ )
{
p1.Post(i);
p2.Post(i);
}
Arbiter.Activate(new DispatcherQueue("q"),
Arbiter.Receive(true, p1,
delegate(int i)
{
Console.WriteLine("p1:{0}", i);
}),
Arbiter.Receive(true, p2,
delegate(int i)
{
Console.WriteLine("p2:{0}", i);
})
);
// Output:
//p1:0
//p1:1
//p1:2
//p1:3
//p1:4
//p1:5
//p1:6
//p1:7
//p1:8
//p1:9
//p2:0
//p2:1
//p2:2
//p2:3
//p2:4
//p2:5
//p2:6
//p2:7
//p2:8
//p2:9
}
georgioc wrote:
staceyw wrote:
georgioc wrote:
mml wrote: regarding interleaving... Hypothetically, why does the CCR have to wait for say your 1million messages to be coordinated before being able to service for example this 1 other important message you want to process (which is not part of the coordination of the 1million messages but ends up having to wait until those 1million messages get coordinated; or does it wait?)?
I might have missed this in the interview but does the CCR allow messages to get thrown into the mix before the 1million messages get coordinated that happen to not be part of that coordinated set so to speak. Maybe what I asking is whether or not the scheduling algorithm accomodates this in the interleaving pattern or does the CCR address this in some other way.
Nice idea by the way, I hope to see more of it. Also, IMHO I wish the threading model would get replaced with something that allowed for better determinacy.
Hi, the CCR doesnt have to waitAnd it doesnt. I actually round robin between dispatcher queues associated with the same dispatcher and i will process a message from each queue. So even if you have 1 million messages coming from one port, associate it with its own dispatcher queue, and then associate your other, critical port that receives only one message once in a while, with a different dispatcher queue!!!
This was a critical feature for real time, priority and alot of other applications, so the CCR was designed from the beginning to have a scheduling systems, that makes this work.
Jeff's examples show how to create dispatcher queues (one line) and then when you call Arbiter.Activate, pass the queue you want, with the appropriate receive operation.
I have a question in regards to this. As the code below shows, Port processing does not seem to be fair. The first port posting a receiver seems to win the attention of the DQ until it is drained. Is this a "fair" model? tia
public static void PortTest()
{
Port<int> p1 = new Port<int>();
Port<int> p2 = new Port<int>();
for ( int i=0; i<10; i++ )
{
p1.Post(i);
p2.Post(i);
}
Arbiter.Activate(new DispatcherQueue("q"),
Arbiter.Receive(true, p1,
delegate(int i)
{
Console.WriteLine("p1:{0}", i);
}),
Arbiter.Receive(true, p2,
delegate(int i)
{
Console.WriteLine("p2:{0}", i);
})
);
// Output:
//p1:0
//p1:1
//p1:2
//p1:3
//p1:4
//p1:5
//p1:6
//p1:7
//p1:8
//p1:9
//p2:0
//p2:1
//p2:2
//p2:3
//p2:4
//p2:5
//p2:6
//p2:7
//p2:8
//p2:9
}
There is a couple of reasons why you see this, that is an artifact of this little experiment.
Notice how you posed ALL messages first, then activated. This means the first time you attach a receiver on port 1, we will have to generate alot of tasks since the messages are all there.
Now, given that, notice that you used the same queue for both receivers.
So the combination of both causes this, which is expected. And since no guarantees are given for ordering between ports, this is just one possible interleaving.
In my post above i mention that we round robin among dispatcher queues. So if you try the same exact experiment, but activate independently the receivers on port 1 and 2, using two different dispatcher queues, you will see interleaving take place. Ofcourse between two different dispatchers, with one queue each, for each receiver, same thing, interleaving will occur.
Note that task dispatching is fair. Port processing is independent of any other port but if they use the same queue, then the order the tasks where generated, will determine the order they run.
another way to see things interleave is activate *first*, then start posting. We will then start running tasks in parallel with your for loop
Hi George,
First of all I want to thank you for the great library – I consider it one of the best pieces in .NET. I just started to learn it and have a few questions.
1. I have one port and I want dynamically to add and remove receiver. I understand that I can add receiver using Arbiter, for example by using Arbiter.Activate(dq, Arbiter.Receive(true, port, Handler<>). But how can I remove this receiver? Why there is no method like Arbiter.Deactivate(….)?
2. In another scenario I have several ports and one receiver. How can I serialize calls to the delegate? Should I use Arbiter.Interleave (ExclusiveReceiverGroup) or I just can use lock(syncObject) in the receiver handler? Also, is there any way to dynamically add/remove a receiver to a group?
Again, thanks for your work and your passion
Alexq wrote:Hi George,
First of all I want to thank you for the great library – I consider it one of the best pieces in .NET. I just started to learn it and have a few questions.
1. I have one port and I want dynamically to add and remove receiver. I understand that I can add receiver using Arbiter, for example by using Arbiter.Activate(dq, Arbiter.Receive(true, port, Handler<>). But how can I remove this receiver? Why there is no method like Arbiter.Deactivate(….)?
2. In another scenario I have several ports and one receiver. How can I serialize calls to the delegate? Should I use Arbiter.Interleave (ExclusiveReceiverGroup) or I just can use lock(syncObject) in the receiver handler? Also, is there any way to dynamically add/remove a receiver to a group?
Again, thanks for your work and your passion
staceyw wrote:
Thanks again George. I saw the behavior with the seperate queues. It does beg the question, however, if it would make any sense to round-robin the ports in a single queue or at least allow the option? As you said, you can get future like behavior with posting to ports before activation. However with current implementation, you would need to remember and expect this behavior, instead of letting the system run fairness across ports which may seem more intuitive. Would there be cost to this? If it could be done cheaply, would it be useful? Just wondering to help understanding.
Also, you had mentioned a couple times in the videos about priority queues. I don't see how to set priority. I see thread priority can be set on a Dispatcher, but it sounded like you where talking about Q priority could be set. So different queues could have different priorities in the same dispatcher.
Thanks
georgioc wrote:
First question (receiver de-activation):
Hi, there are 3 ways to de-activate a receiver:
1) make the receiver one time, and the CCR will remove it after it processes one message, and it will do so automatically and atomically
2) use Arbiter.Choice to coordinate between receivers and the CCR will again remove all receivers after the first receiver executes
3) use Interleave and the TeardownReceiverGroup, and when a receiver in the group executes all other receivers will be removed
Ok there is actually one more way but its low level and youhave to be carefull when to use. If you look at the port API itself there is a Register and Unregister receiver, if you want to take matters on your own hands
Second question: Serializing calls
I strongly recommend to not use any calls to traditional thread locking APIS with the CCR. Use interleave, join constructs, activate one time receivers and then re-register them etc, to achieve serialization. Interleave is the most intuitive and takes care of alot of conditions.
There are actually a few ways to achieve serialization with a system like the CCR, depending on what you are trying to do, but again interleave should allow you with a couple of lines, to serialize all access to apiece of code, even across asychnronous continuations (if the exclusive receiver is an iterator)
hope this helps
Alexq wrote:
georgioc wrote:
First question (receiver de-activation):
Hi, there are 3 ways to de-activate a receiver:
1) make the receiver one time, and the CCR will remove it after it processes one message, and it will do so automatically and atomically
2) use Arbiter.Choice to coordinate between receivers and the CCR will again remove all receivers after the first receiver executes
3) use Interleave and the TeardownReceiverGroup, and when a receiver in the group executes all other receivers will be removed
Ok there is actually one more way but its low level and youhave to be carefull when to use. If you look at the port API itself there is a Register and Unregister receiver, if you want to take matters on your own hands
Second question: Serializing calls
I strongly recommend to not use any calls to traditional thread locking APIS with the CCR. Use interleave, join constructs, activate one time receivers and then re-register them etc, to achieve serialization. Interleave is the most intuitive and takes care of alot of conditions.
There are actually a few ways to achieve serialization with a system like the CCR, depending on what you are trying to do, but again interleave should allow you with a couple of lines, to serialize all access to apiece of code, even across asychnronous continuations (if the exclusive receiver is an iterator)
hope this helps
Thanks a lot! I had a feeling that CLR thrddaing locking doesn't sound right!
One more question:
How does CCR works across domains? Can I register a receiver which is a callback to a remoting object and will be executed in a different domain? Do you see any problems in this scenario
sturla wrote:Hello
I'm working on a problem that seems to be perfect for the CCR. The task at hand is concurrency and coordination against a very picky search index. The index axcepts several different operations, such as search, add, delete, optimize and so on. However, these operations has to be coordinated according to priority, concurrency rules and available resources.
Priority:
Search typically requires a fast response.
Concurrency rules.
There is a matrix defining which operations that can execute at the same time. Search + Add is ok, Add+Delete is not.
Resources:
The resourcelevel may at any time change, resulting in (for instance) a full stop in none search operations.
Obviously this can be done with some sort of object queue, and some sort of traditinal control unit that, i'm guessing, will have to do a lot of work (and most likely will have some issues). What i'm looking for is a way of implementing my queue fast, and as flexible as possible. My first thought when i read about ccr was that i can get a lot of what i want for free. Am i right?
I'm guessing that the right solution is to implement my own port, which supports my set of index operations. What i'm not to sure about is how i can create a receiver that at any time know which queue items to dispatch (according to the concurrency matrix, and priority).
sturla wrote:Hello and thanks for your answere.
This surely solves the concurrency issue, however I'm failing to see how i can prioritize among the different tasks.
I've been playing around with TearDownReceiverGroup(), to allow for "only" search message dispatching. It seems that the TearDown happens after all items in the queue has been handled. For instance..if the queue contains 10 adds, and 5 delete, while a new search message enters the queue, how can I setup my statemachine so that the search message is dispatched before the others messages already in the queue.
orest wrote:Hi George,
I have basically two questions concerning CCR.
1. How can I cancel pending ReceiverTasks? Let's say that I have created some permanent Receivers and wired them up with some Port. Finally, I've posted some 1000 items to that port. I have the Cancel button on my form and now I want to cancel all pending tasks. Should I use another DispatcherQueue to preempt existing queued tasks? How can I do that?
2. I am playing with the WinFormsAdaptor and WinFormsServicePort. Inside my main method I wrote following:
static void Main()
{
Application.EnableVisualStyles();
Application.SetCompatibleTextRenderingDefault(false);
Dispatcher dispatcher = new Dispatcher(0, "CCRDemo Threads");
DispatcherQueue dispatcherQueue = new
DispatcherQueue("CCRDemo DispatcherQueue", dispatcher);
WinFormsServicePort port =
WinFormsAdaptor.Create(dispatcherQueue);
port.Post(new RunForm(new WinFormConstructor(delegate
{
return new MainForm(dispatcherQueue);
})));
}
MainForm displays properly and everything looks nice until I push close button on my form. Form closes but application hangs i.e. WinFormsUIThread running hidden form doesn't stop. What should I do to properly shutdown the app?
BTW, I haven't slept properly lately because of the CCR. It is so fascinating! It took me a couple hours to grasp the basics and another couple with Reflector to grasp what's really inside. One really gets productive after, basically, very short period. All my previous multithreading attempts were painful coordination between various Manual/AutoResetEvents and other mt primitives, various Queues with posted data etc. What I am trying to do now is to come up with an example where I can not use CCR. I haven't succeeded yet! CCR should definitely be a part of the next .NET version
. Bravo!
ok
shmarya wrote:Hi George!
CCR is great, I'm really enjoying it!
Two quick questions:
1. How can I get some code to execute when the last item is removed from a port/dispatcherQueue? IE: I want to post a set of related workitems, yield, and continue only when all items have been dealt with...
2. I have a set of related workitems which I post into a persistent port, I then go and set up an Interleave arbiter so that if one fails, the entire arbitration is torn down. How can I ensure that the items that where in the port's queue at the time the teardown occurs are popped out - IE: how can I make sure they won't be around the next time some Receivers are attached to the Port?
Thanks,
Shmarya
georgioc wrote:
1. I assume you know upfront some pre determined count of messages otherwise its hard to tell which is last. You could use a timeout port etc, if you want to stop after a break in receiving messags. But in general a message could arrive at any time, even if there times where nothing is going on. Assuming you have a counter or some way to count you have two options:
a) Use multipleItemReceive, and specify the number of items you expect. Then your handler will be executed when all N items are received
b) use a one time Receiver once to run your handler as the first message arrives, increment a counter, then as long as count <Total, keep re-issuing the receiver, essentially throttling and waiting for the last message on your own
c) use a persistent receiver in a exclusive block in an interleave, use a counter as in b), but then issue a teardown message to a receiver in the TeardownGroup
shmarya wrote:
Hi Georgio...
I have a number of items which I want to execute 'concurrently', in addition, I want to be able to run specific code only when all the items have completed... I suppose I could use a multipleItemReceiver on a 'Response' port - IE. setup some persistent arbiter to run the work on the items concurrently, and then have each item post a response to the central Response port - then have the initial calling method register an arbiter with a choice between a timeout and a multipleItemReciever...
In this case, how can I unregister the original receivers when a timeout occurs? I suppose I can put it all inside a single interleave... but I am not sure that is the best possible solution...
One question regarding this: is the implementation of Port<> thread safe? IE, is it possible to post to it from multiple threads?
Another (unrelated) question: How does the Ccr function across AppDomain/Process boundaries via Remoting? Are there any caveats I should be aware of?
Thanks,
Shmarya
mgarski wrote:George,
I've become a big fan and evangelist of the CCR and have been experimenting with it in a few Windows services I maintain. I do have a few questions for you as to it's use in my scenarios.
First of all, every Arbiter I use is persistent and when the service recieves a shutdown signal I want to cancel all pending tasks. I have found that by calling Dispose on the DispatcherQueue instances I can clear out the tasks that have not yet started while allowing those currently running to continue. Would you consider this approach a safe way of cancelling pending/queued tasks?
Secondly, all of the work performed by the threads in my application require access to a shared resource and I need to ensure that all of the work is complete before I close up the resource and exit the application. I've experimented with the DispatcherQueue.Count and Dispatcher.PendingTasks properties but have found that they are both decremented when a task begins to run, and the Dispatcher.ProcessedTasks property is also incremented at that time. I've taken to the approach of having a property that reflects the current state of the object that is running on the CCR thread to ensure it is complete such as:
while(!AllWorkComplete())
{
Thread.Sleep(100);
}
CloseSharedResource();
Where all work complete iterates through the objects that are performing the work and return false if any one is still running. This is working in my application but I was wondering why there is not a "RunningTasks" property on the Dispatcher similar to PendingTasks and ProcessedTasks.
The CCR has made my life so much easier and my code far cleaner and easier to maintain - especially using Interleave for concurrent and exclusive tasks.
Thanks,
Mike
georgioc wrote:
mgarski wrote:George,
I've become a big fan and evangelist of the CCR and have been experimenting with it in a few Windows services I maintain. I do have a few questions for you as to it's use in my scenarios.
First of all, every Arbiter I use is persistent and when the service recieves a shutdown signal I want to cancel all pending tasks. I have found that by calling Dispose on the DispatcherQueue instances I can clear out the tasks that have not yet started while allowing those currently running to continue. Would you consider this approach a safe way of cancelling pending/queued tasks?
Secondly, all of the work performed by the threads in my application require access to a shared resource and I need to ensure that all of the work is complete before I close up the resource and exit the application. I've experimented with the DispatcherQueue.Count and Dispatcher.PendingTasks properties but have found that they are both decremented when a task begins to run, and the Dispatcher.ProcessedTasks property is also incremented at that time. I've taken to the approach of having a property that reflects the current state of the object that is running on the CCR thread to ensure it is complete such as:
while(!AllWorkComplete())
{
Thread.Sleep(100);
}
CloseSharedResource();
Where all work complete iterates through the objects that are performing the work and return false if any one is still running. This is working in my application but I was wondering why there is not a "RunningTasks" property on the Dispatcher similar to PendingTasks and ProcessedTasks.
The CCR has made my life so much easier and my code far cleaner and easier to maintain - especially using Interleave for concurrent and exclusive tasks.
Thanks,
Mike
Glad to hear its working well. On your questions:
1) Per service, assuming it has an interleave around its persistent receivers, i would recommend having a one time receiver in a teradown group, to do any cleanup . This guarantees teardown will happen after all receivers for that service have stopped executing, and no more will ever run after. What you use now is more appropriate for process wide, cleanup
2) Also here, interleave takes care of this since it exactly tracks internally what receivers are running. Then , if you use the teardown group, when you decide to shutdown, your shutdown will only run when all active tasks generated from within the interleave, have stopped.
If its not feasible to use interleave or you want to make sure that everything is done across multiple interleaves or other arbiters, all associated with one dispatcher, then your approach is fine.
We don have a running tasks value since its almost alwasy guaranteed to be out of sync and will force even more memory coherence between workers, slowing down perf. We try to keep the dispacther workers independent of each other, so they can run at maximum concurrency. Note that the max number of active tasks will always be equal to the thread workers, never more. So if the queue counts for the dispatcher queues are < Worker count, that means things are about to idle.
g
mgarski wrote:
George -
Thanks for your answer, I am currently using an interleave and will add a TeardownReceiverGroup that will get an object posted to it on the service's shutdown signal.
Given that, I'm assuming that if I want to ensure that all of the messages on the dispatcher queue are processed I would want to do something like:
while(dispatcherQueue.Count > 0)
{
Thread.Sleep(100);
}
shutdownPort.Post(new Object());
I think this would safely let all of the queued up tasks to process and then post to my shutdown port firing off the TeardownReceiverGroup to perform any final cleanup necessary, correct?
Thanks again,
Mike
oddurmagnusson wrote:This stuff is really intresting. Would it be possible to use this to implement something similiar to stackless python( ironpython + ccr = stackless python in clr, now that would just make my day )
/// <summary>
/// Specifies dispatcher queue task scheduling behavior
/// </summary>
public enum TaskExecutionPolicy
{
/// <summary>
/// Default behavior, all tasks are queued with no constraints
/// </summary>
Unconstrained = 0,
/// <summary>
/// Queue enforces maximum depth (specified at queue creation)
/// and discards tasks enqueued after the limit is reached
/// </summary>
ConstrainQueueDepthDiscardTasks,
/// <summary>
/// Queue enforces maximum depth (specified at queue creation)
/// but does not discard anny tasks. It forces the thread posting any tasks after the limit is reached, to
/// sleep until the queue depth falls below the limit
/// </summary>
ConstrainQueueDepthThrottleExecution,
/// <summary>
/// Queue enforces the rate of task scheduling specified at queue creation
/// and discards tasks enqueued after the current scheduling rate is above the specified rate
/// </summary>
ConstrainSchedulingRateDiscardTasks,
/// <summary>
/// Queue enforces the rate of task scheduling specified at queue creation
/// and forces the thread posting tasks to sleep until the current rate of task scheduling falls below
/// the specified average rate
/// </summary>
ConstrainSchedulingRateThrottleExecution
}
Hi George,
My implementation of the CCR is going really well and it's great to see new versions coming out every month.
I have an issue that is niggling at me though and it's to do with the ProcessedTaskCount of a Dispatcher. I have a long running Windows Service that has 1 Dispatcher and multiple Dispatcher Queues. The service is quite busy, so the ProcessedTaskCount goes up quite quickly. Given that ProcessTaskCount is an Int32, it's not unreasonable that over some time I will hit the Int32.MaxValue and the ccr will then bring my AppDomain down. Does this value recycle before it reaches Int32.MaxValue?
Many thanks
Marshall
shmarya wrote:Hi George,
I have a question:
I am trying to build a method which may be either asynchronous or synchronous, using CCR internally (to the method) to accomplish the asynchronousicity...
So basically I have a method which has two local ports, one for the input message and the other for the returned results.
Everything is fine until I want to block on the receiver for the result port in order to acheive 'synchronous' operation.
How can I make the Activate call block?
Is there a better method for doing this?
Thanks,
Shmarya
tartufella wrote:Hi George,
My implementation of the CCR is going really well and it's great to see new versions coming out every month.
I have an issue that is niggling at me though and it's to do with the ProcessedTaskCount of a Dispatcher. I have a long running Windows Service that has 1 Dispatcher and multiple Dispatcher Queues. The service is quite busy, so the ProcessedTaskCount goes up quite quickly. Given that ProcessTaskCount is an Int32, it's not unreasonable that over some time I will hit the Int32.MaxValue and the ccr will then bring my AppDomain down. Does this value recycle before it reaches Int32.MaxValue?
Many thanks
Marshall
dfarino wrote:Hi George,
I have a comment about the UseBackgroundThreads static property on Dispatcher.
It looks like you can set this property and its value is used during the creation of a Dispatcher instance. It feels to me like it should be a contructor parameter instead of a static property.
The reason I ask is that we have CCR code (running in the same AppDomain) that was written by several developers. It seems like having to do something like this introduces a race condition:
bool old = Dispatcher.UseBackgroundThreads;
Dispatcher.UseBackgroundThreads = true;
Dispatcher d = new Dispatcher();
Dispatcher.UseBackgroundThreads = old;
Am I missing the reason for this design decision?
Thanks,
Dan
enelson wrote:Hi George,
Dan is a coworker of mine and posted this after I commented about that.
We don't necessarily have a situation where we NEED the ability to have different settings, but we do have a situation where changing the value could affect code in different libraries, and I felt that was a Bad Thing (tm).
Basically, CCR has caught on like wildfire around here, so we have multiple libraries in the same AppDomain using it simultaneously. I personally have a component based service where the service itself has a Dispatcher for message distribution and then one the components has its own for part of its message processing tasks.
Since I happen to have written both the service and this component, it's not much of a concern. But other developers will be writing components for the service, and it feels programmatically rude to change that static value and have it affect their instantiations. It's not a huge deal to save the old value and change it back, but it is kind of ugly
Thanks for the wonderful library! This is a situation that could only arise from us loving it so much.
George,
I am trying to create an alternate receiver that would send a message to all predicates and applicable handlers, versus stopping on a single Predicate match.
There is a problem with my implementation not getting a second pass through the EvaluatorAggregateTask's Execute method after the first yield return. I also tried to return an alternate IEnumerator<> from a List<> and received the same behavior.
I was able to get all the EvaluatorTask<> enqueued by calling the DispatchQueue.Enqueue, not sure if this is correct. Do you know if the framework should be executing all of the tasks returned back in the enumerator? Also please mention anything I look at
doing differently.
Thanks.
Code snippets as follows:
public class TestReceiver<T>
: Receiver
{
public TestReceiver(bool persist, IPortReceive port,
params Tuple<Predicate<T>, Handler<T>>[] tuples)
: base(true, port, new EvaluatorAggregateTask<T>(tuples))
{
// no code
}
}
internal class EvaluatorAggregateTask<V>
: ITask
{
public IEnumerator<ITask> Execute()
{
foreach (Tuple<Predicate<V>, Handler<V>> tuple in this.tuples)
{
Predicate<V> predicate = (Predicate<V>)tuple.Item0;
Handler<V> handler = (Handler<V>)tuple.Item1;
yield return new EvaluatorTask<V>((PortElement<V>)this[0],
handler, predicate);
}
}
[Rest of ITask implementation here ... ]
}
internal class EvaluatorTask<V>
: ITask
{
public IEnumerator<ITask> Execute()
{
if (this.predicate(this.param.TypedItem) == true)
{
this.handler(this.param.TypedItem);
}
return null;
}
[Rest of ITask implementation here ... ]
}
dtspence wrote:George,
I am trying to create an alternate receiver that would send a message to all predicates and applicable handlers, versus stopping on a single Predicate match.
There is a problem with my implementation not getting a second pass through the EvaluatorAggregateTask's Execute method after the first yield return. I also tried to return an alternate IEnumerator<> from a List<> and received the same behavior.
I was able to get all the EvaluatorTask<> enqueued by calling the DispatchQueue.Enqueue, not sure if this is correct. Do you know if the framework should be executing all of the tasks returned back in the enumerator? Also please mention anything I look at doing differently.
Thanks.
Code snippets as follows:
public class TestReceiver<T>
: Receiver
{
public TestReceiver(bool persist, IPortReceive port,
params Tuple<Predicate<T>, Handler<T>>[] tuples)
: base(true, port, new EvaluatorAggregateTask<T>(tuples))
{
// no code
}
}internal class EvaluatorAggregateTask<V>
: ITask
{
public IEnumerator<ITask> Execute()
{
foreach (Tuple<Predicate<V>, Handler<V>> tuple in this.tuples)
{
Predicate<V> predicate = (Predicate<V>)tuple.Item0;
Handler<V> handler = (Handler<V>)tuple.Item1;
yield return new EvaluatorTask<V>((PortElement<V>)this[0],
handler, predicate);
}
}
[Rest of ITask implementation here ... ]
}
internal class EvaluatorTask<V>
: ITask
{
public IEnumerator<ITask> Execute()
{
if (this.predicate(this.param.TypedItem) == true)
{
this.handler(this.param.TypedItem);
}
return null;
}
[Rest of ITask implementation here ... ]
}
kroki wrote:George,
Can you refer me to a library with similar functionality as the CCR, though for unmanaged C++?
Thanks in advance,
Kroki
georgioc wrote:
dtspence wrote: George,
I am trying to create an alternate receiver that would send a message to all predicates and applicable handlers, versus stopping on a single Predicate match.
There is a problem with my implementation not getting a second pass through the EvaluatorAggregateTask's Execute method after the first yield return. I also tried to return an alternate IEnumerator<> from a List<> and received the same behavior.
I was able to get all the EvaluatorTask<> enqueued by calling the DispatchQueue.Enqueue, not sure if this is correct. Do you know if the framework should be executing all of the tasks returned back in the enumerator? Also please mention anything I look at doing differently.
Thanks.
Code snippets as follows:
public class TestReceiver<T>
: Receiver
{
public TestReceiver(bool persist, IPortReceive port,
params Tuple<Predicate<T>, Handler<T>>[] tuples)
: base(true, port, new EvaluatorAggregateTask<T>(tuples))
{
// no code
}
}internal class EvaluatorAggregateTask<V>
: ITask
{
public IEnumerator<ITask> Execute()
{
foreach (Tuple<Predicate<V>, Handler<V>> tuple in this.tuples)
{
Predicate<V> predicate = (Predicate<V>)tuple.Item0;
Handler<V> handler = (Handler<V>)tuple.Item1;
yield return new EvaluatorTask<V>((PortElement<V>)this[0],
handler, predicate);
}
}
[Rest of ITask implementation here ... ]
}
internal class EvaluatorTask<V>
: ITask
{
public IEnumerator<ITask> Execute()
{
if (this.predicate(this.param.TypedItem) == true)
{
this.handler(this.param.TypedItem);
}
return null;
}
[Rest of ITask implementation here ... ]
}
Hi Tasks never get their execute method called more than once. Persistent receivers even, will clone the task that runs the handler, and call it once for each message.
To achieve what you want, there is a different approach. Create the alternate receiver, just like you did. In the receiver's Evaluate method (override it) take the IPortElement.Value contents, and essentially broad cast to any other port or handlers you want. Make sure you return true from the Evaluate, so the port does not keep the message in its queue
dtspence wrote:George,
Thanks for the reply. Your suggestion worked great. I have another related question.
Could you explain what the runtime does with the return type of IEnumerator<ITask> a little? The runtime seemed to execute the first task, but not the others.
Thanks......
Hi,
How does CCR compare to DataRush from Pervasive in Java land (http://www.pervasivedatarush.com/technology).
Are these two in similar concept?
Thanks.
Raghu/..
DonSch wrote:George,
I'm wondering if you could comment on the recommended use of ThreadCausalities in CCR (in my case non-DSS environment). I can see that Causalities will be transferred from the current thread to the post operation, and that they are conversely transferred from the ITask back to the executing thread, making the context available to delegates as called from Receivers, etc.
The obvious use is to provide an ExceptionPort that can take care of any unhandled exceptions that might have been thrown during events, etc. I have not really seen too many examples, nor is is clear what the appropriate patterns of use really are..
Possible example: I'm inside an iterator doing a number of yield return operations, etc, to Choices, etc. and may want that iterator to have a way to handle any unhandled exceptions during those operations, so I add a CausalityContext within that iterator.
Then perhaps one of the services that I do a yield return call to (it returning a Choice) decides that it needs a 'nested' causality, so it creates one for it's exception handling (of unhandled exceptions in the dispatcher chain).. Once that service posts back thru the Choice, it appears that the original Iterator has it's restored CausalityContext in place again.
Any insight on the appropriate usage patterns would really be appreciated. Also the intent of the CoordinationPort?
The CCR appears to be an extremely useful library for a vast number off cases and I really appreciate your time in filling in the gaps!
tnx again!
Don
Everyone are welcome to try my commit to the community and play with my framework samples at http://plugins.codeplex.com
Sincerely yours,
hack2root