{A}
{S0}简介
本文介绍了使用并发与协调运行时(CCR)实施的管道和过滤器的设计模式。这个想法从采访{A2}。
声明:请注意,我不是一个并发的专家,这是我第一次尝试与CCR。背景
管道和过滤器的设计模式是用来描述一个消息源对象创建,改造过滤器对象,和接收器对象所消耗的管道。经典的多线程编程是命令式编程,开发分配线程(每个对象的线程,每封邮件的线程,异步线程池计划)。例如,一个必要的实施将使用Queuelt; GT;管道,由ReaderWriter锁锁的访问,作为一个标准的异步操作的线程池执行对象处理。
另一方面,声明式编程定义quot; whatquot;做,但如何做到这一点。例如,LINQ允许过滤器使用的条件,而不必担心对特定的顺序遍历数组的数组。同样,并发与协调运行时(CCR),允许开发人员申报quot; whatquot应该发生在每个对象,而不必担心有关线程,锁,和诸如此类的东西(好,几乎)。
让我们下潜到一些基本的CCR declerations:using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
{
DispatcherQueue dq = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher);
}
调度是对象拥有一个线程池。第一个参数是要使用的线程数,其中0表示设置的线程数量最大(2,NumberOfProcessors)* ThreadPerCPU。调度程序队列中认为可以立即执行,并等待一个线程变为可用的代表名单。欲了解更多信息,请参阅文章A3}。
这里是一个片段,注册监听三个端口的三种方法。一个港口举行的消息,并认为在处理这些消息感兴趣的方法。{C}
读到这样的代码:每次inputPort.Port0lt; correctpinmessagegt;收到一个CorrectPinMessage对象,调用HandleSuccessMessage。每次inputPort.Port1lt; wrongpinmessagegt;收到一个WrongPinMessage对象,调用HandleFailureMessage。每次_timeoutPortlt; datetimegt;收到一个DateTime对象,调用HandleTimeout的方法。
Arbiter.Receive第一个参数(坚持= TRUE)表明,该方法继续处理的第一条消息后,接收邮件。
Arbiter.Interleave声明定义的锁定机制,CCR使用。 HandleSuccessMessage和HandleFailureMessage定义并发组(在本质上类似于ReaderWriterLock.AcquireReaderLock())。
并行处理程序的实现是线程安全的(使用互锁递增取得):void HandleFailureMessage(WrongPinMessage wrongPinMessage)
{
if (Interlocked.Increment(ref _wrongPinCount) == 1)
{
_firstWrongPinMessage = wrongPinMessage;
_dispatcherQueue.EnqueueTimer(_pollingPeriod, _timeoutPort);
}
}
void HandleSuccessMessage(CorrectPinMessage correctPinMessage)
{
Interlocked.Increment(ref _correctPinCount);
}
请注意,当第一WrongPinMessage收到,启动计时器。在几毫秒后,_timeoutPort接收触发HandleTimeout方法调用DateTime对象。自HandleTimeout是独家组定义(在本质上类似于ReaderWriterLock.AcquireWriterLock())
,处理程序执行是不需要线程安全的。它访问,而无需使用任何锁或互锁调用_wrongPinCount成员:void HandleTimeout(DateTime time)
{
CardStatus newStatus = _currentCardStatus;
if (_correctPinCount > 0)
{
newStatus = CardStatus.Ok;
}
else if (_wrongPinCount > 3)
{
newStatus = CardStatus.Stolen;
}
else if (_wrongPinCount > 0)
{
newStatus = CardStatus.Warning;
}
...
}
HandleTimeout方法quot; assumesquot; x毫秒已通过以来被称为第一WrongPinMessage。它使用的那段时间改变的状态机的内部状态中收到的邮件的总数。
CCR的基本概念的更多信息,视频{A4}。使用代码
在这篇文章中的代码定义抽象的源的管道(有一个输出端口 - 绿色)过滤器(输入端口和输出端口 - 蓝色)和一个接收器(只有输入端口 - 黄色)。
{S1}
演示项目介绍工人,有一个秘密的PIN(个人识别号码)的卡。 CorrectPinMessage描述了一个输入正确的PIN码,同样,WrongPinMessage代表一个不正确的PIN号码的工人的事件。 ";工人的模拟sourcequot;对象创建新邮件时,状态机整合计数的类似事件。 OpenAlert的消息或CloseAlert消息在控制台窗口显示报警片的每个状态变化的结果。
每个管道对象实现IMessageSink或IMessageFilter或IMessageSink。接口初始化方法用于调用的CCR Arbiter.Activate()方法。 Start方法用于初始化后,所有的对象都已开始产生消息。interface IMessageSink<tinputport> where TInputPort : IPort
{
TInputPort InputPortSet { get; }
void Initialize(DispatcherQueue dispatcherQueue, TInputPort inputPortSet);
}
interface IMessageSource<toutputport> where TOutputPort: IPort
{
TOutputPort OutputPortSet { get; }
void Initialize(DispatcherQueue dispatcherQueue, TOutputPort outputPortSet);
void Start();
}
interface IMessageFilter<tinputport,toutputport>
{
TInputPort InputPort { get; }
TOutputPort OutputPort { get; }
void Initialize(DispatcherQueue dispatcherQueue,
TInputPort inputPort, TOutputPort outputPort);
}
扩展方法是用来提供这些对象串行和并行连接。串行连接== GT;过滤器== GT;下沉的消息流连接的天然来源。并行连接,允许N源的邮件投递到同一个输出端口。static public IMessageSource<toutputport> ConnectTo<tinputport,>(
this IMessageSource<tinputport> source,
IMessageFilter<tinputport,> filter)
where TInputPort : IPort,new()
where TOutputPort : IPort
{ ... }
public static IMessageSource<tinputport> ConnectInParallel<tinputport>(
this IMessageSource<tinputport> source1,
IMessageSource<tinputport> source2)
where TInputPort : IPort
{ ... }
当源连接到一个接收器,创建一个完整的管道。
每名工人的国家机interface IMessagePipeLine
{
void Start(DispatcherQueue dispatcherQueue);
}
public static IMessagePipeLine ConnectTo<tport>(
this IMessageSource<tport> source,
IMessageSink<tport> sink)
where TPort : IPort,new()
{ ... }
,为了节省每名工人一个状态机,我们需要解复到新的状态机的消息。
{S2}
声明:下面的实现是不是最佳的。我很乐意听取您的意见。
第一个过滤器转换成KeyValuePairlt类型T的消息;键,TGT;消息。关键是名工人,键入PIN。class WorkerKeyValueFilter :
IMessageFilter<PortSet<CorrectPinMessage>,
Port<KeyValuePair<string,CorrectPinMessage>>>
{
void HandleMessage<T>(T message) where T : WorkCardSwipeMessageBase
{
OutputPort.Post(new KeyValuePair<string,T>(message.Name, message));
}
}
第二个过滤器使用的关键职位的消息的专用端口。
历史10月10日 - 2008年:最初版本。class DemuxMessageFilter<TMessageFilter, TInputPort,TOutputPort,TKey>
: IMessageFilter<TInputPort,TOutputPort>
where TMessageFilter : IMessageFilter<TInputPort,TOutputPort>, new()
where TInputPort : IPortSet,new()
where TOutputPort : IPort
{
void HandleMessage(KeyValuePair<TKey,object> message)
{
var messageFilter = GetMessageFilter(message.Key);
messageFilter.InputPort.PostUnknownType(message.Value);
}
TMessageFilter GetMessageFilter(TKey key)
{
TMessageFilter filter;
if (!_messageFilters.TryGetValue(key, out filter))
{
filter = new TMessageFilter();
filter.Initialize(_dispatcherQueue, new TInputPort(), OutputPort);
_messageFilters.Add(key, filter);
}
return filter;
}
}