使用Reactive Extensions(Rx)延迟和重复数据删除
我想使用Reactive Extensions转换一些消息并在一小段延迟后转发它们。
消息看起来像这样:
class InMsg
{
int GroupId { get; set; }
int Delay { get; set; }
string Content { get; set; }
}
输出看起来像这样:
class OutMsg
{
int GroupId { get; set; }
string Content { get; set; }
OutMsg(InMsg in)
{
GroupId = in.GroupId;
Content = Transform(in.Content); // function omitted
}
}
有几个要求:
延迟的长度取决于消息的内容。
每条消息都有一个GroupId
如果较新的消息带有与等待传输的延迟消息相同的GroupId,则应丢弃第一个消息,并且仅在新的延迟周期之后发送第二个消息。
给定一个Observable< InMsg>和发送功能:
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
我知道我可以使用Select来执行转换。
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
如何应用指定延迟的消息? (注意这可能/应该导致消息的无序传递。)
如何使用相同的GroupId对消息进行重复数据删除?
Rx能解决这个问题吗?
还有另一种解决方法吗?
没有找到相关结果
已邀请:
2 个回复
脖呐
制作
,
来延迟输出,并使用
确保更新的值替换其组中的先前值:
关于实现的说明:如果在发送消息后(即延迟之后)到达分组值,则会启动新的延迟
缮淳彼誊
以结合
结果,如下所示: 对我来说(在.NET Framework 4.6上使用.NET Rx 3.1.1),修复是在末尾添加
,如下所示: