This post is about the CCR part of MRS, in particular I am going to do some simple examples about ports and a simple arbiter the Receiver.
CCR components use ports to communicate with each other. Just like a human being uses speech, verbal speech, or mimicry to communicate, so do CCR components use ports to communicate with each other. Ports are used to
- Send a message to a particular receiver.
- Receive a message from a particular sender.
In terms of CCR port, we have to define how the message, looks like that our port is going to receive. For this, we have to define the message's underlying data type or more precisely we have to create a port, which receives a message with some particular data type.
Port<string> stringPort = new Port<string>();
This creates a new port, which is able to send and receive message of type string. Now, that we have defined our port, how are do we send a message? Nothing simpler than that, the port offers an instance method Post. This method accepts a formal parameter item of type string. So the canonical "Hello World" example looks like this
stringPort.Post("Hello World");
This passes the item of type string with value "Hello World" to our stringPort, where it waits to be taken from the port for whatever one wants to do with it. To illustrate that the sent item is really "sitting" on the stringPort, we could simply do the following:
if (inputPort.ItemCount > 0)
{
string item = inputPort.Test() as string;
if (!string.IsNullOrEmpty(item))
{
Console.WriteLine("Item: " + item);
}
}
This shows two things. The first (1) being, that you may query the number of items waiting on some port by the ports field ItemCount and the second (2) being that you may test the port for the presence of items. If there is one ore more items on the port, then Test will return this item as an object and remove it from the queue of available items. Fortunately, the removal of an item is an atomic process, so that we won't going to race condition hell.
At times you may want to be able to send and receive more than just one item on some port. Therefore, the CCR offers a so called PortSet, which is just that a set of Ports. For example, to create a PortSet accepting a string and DateTime message, we would do the following:
PortSet<string, DateTime> messageDateTimePort = new
PortSet<string, DateTime>();
To avoid passing ports around in some program and taking individual items of some port, the CCR offers arbiters. Arbiters coordinate incoming messages passed between ports. In order to not manually call the Test method on the stringPort, we are going to declare a so called Receiver, which will be active whenever an item is waiting on a port matching the Receiver's signature. For example,
Receiver<string> stringReceiver = new
Receiver<string>(
true,
stringPort,
null,
new
IterativeTask<string>(MessageHandler));
this creates a Receiver, which accepts string items waiting on stringPort. The first parameter with value true sets persistence to true for this receiver. This means that the Receiver continues to receive string items even after it has received one string item. The null value is passed to the constraints formal parameter and the last parameter wires up a message handler, which actually takes care of string items incoming from stringPort. The MessageHandler function is nothing special (of course, you are free to add your own logic to it) and looks like this
private static
IEnumerator<ITask> MessageHandler(string message)
{
Console.WriteLine("Received Message: " + message);
yield break;
}
To make things complete, a small sample illustrates the previous things. The example creates a simple string port, which is used by a secondary thread to to send the current time to. This message is received by a Receiver arbiter and printed out to the console. The program terminates if the user presses any key. In this case we stop the secondary thread and simple exit the program.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Microsoft.Ccr.Core;
namespace CCRTest5
{
class Program
{
private static bool running = false;
static void Main(string[] args)
{
Console.WriteLine("CCRTest5 - send to a message port and use arbiter to receive");
Port<string> stringPort = new
Port<string>();
Console.WriteLine("Created Port<string>");
Thread messageThread = new Thread(new
ParameterizedThreadStart(AddItemFunc));
Receiver<string> stringReceiver = new
Receiver<string>(
true,
stringPort,
null,
new
IterativeTask<string>(MessageHandler));
Console.WriteLine("Created Receiver");
Console.WriteLine("Starting Arbiter");
Arbiter.Activate(new
DispatcherQueue(), stringReceiver);
Console.WriteLine("Starting AddItem Thread");
messageThread.Start(stringPort);
Console.WriteLine("Press key to exit");
Console.ReadLine();
//abort
running = false;
Thread.Sleep(500);
if (messageThread.ThreadState == ThreadState.Running)
{
try
{
messageThread.Abort();
}
catch { }
}
}
private static
IEnumerator<ITask> MessageHandler(string message)
{
Console.WriteLine("Received Message: " + message);
yield break;
}
private static void AddItemFunc(object port)
{
running = true;
Port<string> messagePort = port as
Port<string>;
while(running)
{
messagePort.Post(DateTime.Now.ToString());
Thread.Sleep(500);
}
}
}
}
More on arbiters and CCR in the next post.