返回首页

{A}
{S0}简介
本文介绍了使用并发与协调运行时(CCR)实施的管道和过滤器的设计模式。这个想法从采访{A2}。
声明:请注意,我不是一个并发的专家,这是我第一次尝试与CCR。背景
管道和过滤器的设计模式是用来描述一个消息源对象创建,改造过滤器对象,和接收器对象所消耗的管道。经典的多线程编程是命令式编程,开发分配线程(每个对象的线程,每封邮件的线程,异步线程池计划)。例如,一个必要的实施将使用Queuelt; GT;管道,由ReaderWriter锁锁的访问,作为一个标准的异步操作的线程池执行对象处理。
另一方面,声明式编程定义quot; whatquot;做,但如何做到这一点。例如,LINQ允许过滤器使用的条件,而不必担心对特定的顺序遍历数组的数组。同样,并发与协调运行时(CCR),允许开发人员申报quot; whatquot应该发生在每个对象,而不必担心有关线程,锁,和诸如此类的东西(好,几乎)。
让我们下潜到一些基本的CCR declerations:

using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))

{

   DispatcherQueue dq = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher);

}

调度是对象拥有一个线程池。第一个参数是要使用的线程数,其中0表示设置的线程数量最大(2,NumberOfProcessors)* ThreadPerCPU。调度程序队列中认为可以立即执行,并等待一个线程变为可用的代表名单。欲了解更多信息,请参阅文章A3}。
这里是一个片段,注册监听三个端口的三种方法。一个港口举行的消息,并认为在处理这些消息感兴趣的方法。{C}
读到这样的代码:每次inputPort.Port0lt; correctpinmessagegt;收到一个CorrectPinMessage对象,调用HandleSuccessMessage。每次inputPort.Port1lt; wrongpinmessagegt;收到一个WrongPinMessage对象,调用HandleFailureMessage。每次_timeoutPortlt; datetimegt;收到一个DateTime对象,调用HandleTimeout的方法。
Arbiter.Receive第一个参数(坚持= TRUE)表明,该方法继续处理的第一条消息后,接收邮件。
Arbiter.Interleave声明定义的锁定机制,CCR使用。 HandleSuccessMessage和HandleFailureMessage定义并发组(在本质上类似于ReaderWriterLock.AcquireReaderLock())。
并行处理程序的实现是线程安全的(使用互锁递增取得):
void HandleFailureMessage(WrongPinMessage wrongPinMessage)

{

    if (Interlocked.Increment(ref _wrongPinCount) == 1)

    {

       _firstWrongPinMessage = wrongPinMessage;

       _dispatcherQueue.EnqueueTimer(_pollingPeriod, _timeoutPort);

    }

}



void HandleSuccessMessage(CorrectPinMessage correctPinMessage)

{

    Interlocked.Increment(ref _correctPinCount);

}

请注意,当第一WrongPinMessage收到,启动计时器。在几毫秒后,_timeoutPort接收触发HandleTimeout方法调用DateTime对象。自HandleTimeout是独家组定义(在本质上类似于ReaderWriterLock.AcquireWriterLock())
,处理程序执行是不需要线程安全的。它访问,而无需使用任何锁或互锁调用_wrongPinCount成员:
void HandleTimeout(DateTime time)

{

   CardStatus newStatus = _currentCardStatus;



   if (_correctPinCount > 0)

   {

       newStatus = CardStatus.Ok;

   }

   else if (_wrongPinCount > 3)

   {

        newStatus = CardStatus.Stolen;

   }

   else if (_wrongPinCount > 0)

   {

        newStatus = CardStatus.Warning;

   }

   ...

}

HandleTimeout方法quot; assumesquot; x毫秒已通过以来被称为第一WrongPinMessage。它使用的那段时间改变的状态机的内部状态中收到的邮件的总数。
CCR的基本概念的更多信息,视频{A4}。使用代码
在这篇文章中的代码定义抽象的源的管道(有一个输出端口 - 绿色)过滤器(输入端口和输出端口 - 蓝色)和一个接收器(只有输入端口 - 黄色)。
{S1}
演示项目介绍工人,有一个秘密的PIN(个人识别号码)的卡。 CorrectPinMessage描述了一个输入正确的PIN码,同样,WrongPinMessage代表一个不正确的PIN号码的工人的事件。 ";工人的模拟sourcequot;对象创建新邮件时,状态机整合计数的类似事件。 OpenAlert的消息或CloseAlert消息在控制台窗口显示报警片的每个状态变化的结果。
每个管道对象实现IMessageSink或IMessageFilter或IMessageSink。接口初始化方法用于调用的CCR Arbiter.Activate()方法。 Start方法用于初始化后,所有的对象都已开始产生消息。
interface IMessageSink<tinputport> where TInputPort : IPort

{

   TInputPort InputPortSet { get; }

   void Initialize(DispatcherQueue dispatcherQueue, TInputPort inputPortSet);

}



interface IMessageSource<toutputport> where TOutputPort: IPort

{

    TOutputPort OutputPortSet { get; }

    void Initialize(DispatcherQueue dispatcherQueue, TOutputPort outputPortSet);

    void Start();

}



interface IMessageFilter<tinputport,toutputport>

{

    TInputPort InputPort { get; }

    TOutputPort OutputPort { get; }

    void Initialize(DispatcherQueue dispatcherQueue, 

         TInputPort inputPort, TOutputPort outputPort);

}

扩展方法是用来提供这些对象串行和并行连接。串行连接== GT;过滤器== GT;下沉的消息流连接的天然来源。并行连接,允许N源的邮件投递到同一个输出端口。
static public IMessageSource<toutputport> ConnectTo<tinputport,>(

            this IMessageSource<tinputport> source,

            IMessageFilter<tinputport,> filter)

            where TInputPort : IPort,new()

            where TOutputPort : IPort

{ ... }



public static IMessageSource<tinputport> ConnectInParallel<tinputport>(

            this IMessageSource<tinputport> source1, 

            IMessageSource<tinputport> source2)

            where TInputPort : IPort

{ ... }

当源连接到一个接收器,创建一个完整的管道。
interface IMessagePipeLine

{

     void Start(DispatcherQueue dispatcherQueue);

}

    

public static IMessagePipeLine ConnectTo<tport>(

            this IMessageSource<tport> source, 

            IMessageSink<tport> sink)

            where TPort : IPort,new()

{ ... }
每名工人的国家机
,为了节省每名工人一个状态机,我们需要解复到新的状态机的消息。
{S2}
声明:下面的实现是不是最佳的。我很乐意听取您的意见。
第一个过滤器转换成KeyValuePairlt类型T的消息;键,TGT;消息。关键是名工人,键入PIN。
class WorkerKeyValueFilter : 

   IMessageFilter<PortSet<CorrectPinMessage>, 

   Port<KeyValuePair<string,CorrectPinMessage>>>

{

   void HandleMessage<T>(T message) where T : WorkCardSwipeMessageBase

   {

      OutputPort.Post(new KeyValuePair<string,T>(message.Name, message));

   }

}

第二个过滤器使用的关键职位的消息的专用端口。
class DemuxMessageFilter<TMessageFilter, TInputPort,TOutputPort,TKey> 

        : IMessageFilter<TInputPort,TOutputPort>

        where TMessageFilter : IMessageFilter<TInputPort,TOutputPort>, new() 

        where TInputPort : IPortSet,new() 

        where TOutputPort : IPort

{



   void HandleMessage(KeyValuePair<TKey,object> message) 

   { 

      var messageFilter = GetMessageFilter(message.Key); 

      messageFilter.InputPort.PostUnknownType(message.Value); 

   } 



   TMessageFilter GetMessageFilter(TKey key) 

   { 

      TMessageFilter filter; 

      if (!_messageFilters.TryGetValue(key, out filter)) 

      { 

         filter = new TMessageFilter(); 

         filter.Initialize(_dispatcherQueue, new TInputPort(), OutputPort); 

         _messageFilters.Add(key, filter); 

      } 

      return filter; 

   }

}
历史10月10日 - 2008年:最初版本。

回答

评论会员:i 时间:2