C#阻塞FIFO队列可以泄漏消息吗?
我正在开发一个学术开源项目,现在我需要在C#中创建一个快速阻塞FIFO队列。我的第一个实现只是在读者的信号量中包含一个同步队列(带有动态扩展),然后我决定以下面的方式重新实现(理论上更快)
public class FastFifoQueue<T>
{
private T[] _array;
private int _head, _tail, _count;
private readonly int _capacity;
private readonly Semaphore _readSema, _writeSema;
/// <summary>
/// Initializes FastFifoQueue with the specified capacity
/// </summary>
/// <param name="size">Maximum number of elements to store</param>
public FastFifoQueue(int size)
{
//Check if size is power of 2
//Credit: http://stackoverflow.com/questions/600293/how-to-check-if-a-number-is-a-power-of-2
if ((size & (size - 1)) != 0)
throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work");
_capacity = size;
_array = new T[size];
_count = 0;
_head = int.MinValue; //0 is the same!
_tail = int.MinValue;
_readSema = new Semaphore(0, _capacity);
_writeSema = new Semaphore(_capacity, _capacity);
}
public void Enqueue(T item)
{
_writeSema.WaitOne();
int index = Interlocked.Increment(ref _head);
index %= _capacity;
if (index < 0) index += _capacity;
//_array[index] = item;
Interlocked.Exchange(ref _array[index], item);
Interlocked.Increment(ref _count);
_readSema.Release();
}
public T Dequeue()
{
_readSema.WaitOne();
int index = Interlocked.Increment(ref _tail);
index %= _capacity;
if (index < 0) index += _capacity;
T ret = Interlocked.Exchange(ref _array[index], null);
Interlocked.Decrement(ref _count);
_writeSema.Release();
return ret;
}
public int Count
{
get
{
return _count;
}
}
}
这是我们在教科书上找到的静态数组的经典FIFO队列实现。它被设计成以原子方式递增指针,并且因为当达到(capacity-1)时我无法使指针返回到零,所以我计算模数。理论上,使用Interlocked与执行增量之前的锁定相同,并且由于存在信号量,因此多个生产者/消费者可以进入队列,但是一次只能有一个能够修改队列指针。
首先,因为Interlocked.Increment首先递增,然后返回,我已经明白我只能使用后增量值并从数组中的位置1开始存储项目。这不是问题,当我达到一定值时,我会回到0
有什么问题呢?
您不会相信,在重负载下运行,有时队列会返回NULL值。我确定,重复一遍,我确定,没有方法将null排入队列。这绝对是正确的,因为我试图在Enqueue中进行空检查以确保,并且没有抛出任何错误。我用Visual Studio创建了一个测试用例(顺便说一下,我使用像maaaaaaaany人这样的双核CPU)
private int _errors;
[TestMethod()]
public void ConcurrencyTest()
{
const int size = 3; //Perform more tests changing it
_errors = 0;
IFifoQueue<object> queue = new FastFifoQueue<object>(2048);
Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
Thread[] producers = new Thread[size], consumers = new Thread[size];
for (int i = 0; i < size; i++)
{
producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal };
consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal };
producers[i].Start(queue);
consumers[i].Start(queue);
}
Thread.Sleep(new TimeSpan(0, 0, 1, 0));
for (int i = 0; i < size; i++)
{
producers[i].Abort();
consumers[i].Abort();
}
Assert.AreEqual(0, _errors);
}
private void LoopProducer(object queue)
{
try
{
IFifoQueue<object> q = (IFifoQueue<object>)queue;
while (true)
{
try
{
q.Enqueue(new object());
}
catch
{ }
}
}
catch (ThreadAbortException)
{ }
}
private void LoopConsumer(object queue)
{
try
{
IFifoQueue<object> q = (IFifoQueue<object>)queue;
while (true)
{
object item = q.Dequeue();
if (item == null) Interlocked.Increment(ref _errors);
}
}
catch (ThreadAbortException)
{ }
}
一旦消费者线程获得null,就会计算错误。
当使用1个生产者和1个消费者进行测试时,它会成功。当对2个生产者和2个或更多消费者进行测试时,会发生灾难:甚至检测到2000个泄漏。我发现问题可能出在Enqueue方法中。通过设计契约,生产者只能写入一个空的单元格(null),但是通过一些诊断修改我的代码我发现有时生产者试图在非空单元格上写入,然后被“好”所占据“数据。
public void Enqueue(T item)
{
_writeSema.WaitOne();
int index = Interlocked.Increment(ref _head);
index %= _capacity;
if (index < 0) index += _capacity;
//_array[index] = item;
T leak = Interlocked.Exchange(ref _array[index], item);
//Diagnostic code
if (leak != null)
{
throw new InvalidOperationException("Too bad...");
}
Interlocked.Increment(ref _count);
_readSema.Release();
}
然后经常发生“太糟糕”的异常。但是从并发写入引发冲突太奇怪了,因为增量是原子的,而编写器的信号量只允许与自由数组单元一样多的编写器。
有人可以帮我吗?如果您与我分享您的技能和经验,我将非常感激。
谢谢。
没有找到相关结果
已邀请:
3 个回复
仿普
和
有效地序列化你的所有
和
操作;但这并不是这里发生的事情。请记住,
类用于限制访问资源的线程数,而不是确保特定的事件顺序。每个
和
之间发生的事情不能保证在与
和
自身相同的“线程顺序”中发生。 用文字解释这很棘手,所以让我试着提供一个视觉插图。 假设您的队列容量为8,看起来像这样(让
代表
,
代表一个对象): [x x x x x x x x] 所以
被调用了8次并且队列已满。因此,你的
信号量将阻止
,你的
信号量将立即返回
。 现在让我们假设在6个不同的线程上或多或少同时调用
。我们称之为T1,T2和T3。 在继续之前,让我为你的
实施申请一些标签,以供参考:
好的,所以T1,T2和T3都已经过了A点。那么为了简单起见,我们假设它们各自按顺序到达B行,这样T1的
为0,T2的
为1,T3有
of2。 到现在为止还挺好。但是这里有问题:无法保证从这里开始,T1,T2和T3将以任何指定的顺序到达D行。假设T3实际上超过了T1和T2,超过了C线(从而将
设置为
)并一直到达D线。 在此之后,将发出
信号,这意味着您的队列中有一个可用的插槽可以写入,对吗?但你的队列现在看起来像这样! [x x 0 x x x x x] 因此,如果另一个线程同时调用
,它实际上会超过
,增加
,得到
为0,即使插槽0不为空。这样做的结果是插槽0中的项目实际上可以被覆盖,在T1之前(还记得吗?)读取它。 要了解
值的来源,您只需要可视化我刚才描述的过程的反向。也就是说,假设您的队列如下所示: [0 0 0 0 0 0 0 0] 三个线程,T1,T2和T3,几乎同时调用
。 T3最后增加
但是插入其项目(在
)并首先调用
,导致发出信号
但是队列看起来像: [0 0 x 0 0 0 0 0] 因此,如果另一个线程同时调用了
(在T1和T2完成它们的事情之前),它将超过
,增加
,得到
为0,即使插槽0为空。 所以这是你的问题。至于解决方案,我目前没有任何建议。给我一些时间考虑一下......(我现在正在发布这个答案,因为它在我的脑海中很新鲜,我觉得它可能对你有帮助。)
豹芜澈
出列队将改为这样的......
这是基于丹涛的出色分析。因为索引是以原子方式获得的,所以(假设没有线程在排队或出队方法中死亡或终止),保证读者最终填充其单元格,或者保证作者最终释放其单元格(null)。
筷啸够对铅
出列:
为什么Thread.Sleep(0)?当我们发现无法检索/存储元素时,为什么要立即再次检查?我需要强制上下文切换以允许其他线程读/写。显然,将要安排的下一个线程可能是另一个无法操作的线程,但至少我们强制它。资料来源:http://progfeatures.blogspot.com/2009/05/how-to-force-thread-to-perform-context.html 我还测试了之前测试用例的代码,以获得我的声明的证据: 不睡觉(0)
睡觉(0)
我知道这不是一个“伟大的”发现,我将不会为这些数字赢得图灵奖。性能增量不是很明显,但大于零。强制上下文切换允许执行更多RW操作(=更高的吞吐量)。 需要明确的是:在我的测试中,我只是评估队列的性能,而不是模拟生产者/消费者的问题,所以不要在乎测试结束后一分钟内是否还有队列中的元素。但我只是证明我的方法有效,多亏你们所有人。 代码可用开源为MS-RL:http://logbus-ng.svn.sourceforge.net/viewvc/logbus-ng/trunk/logbus-core/It.Unina.Dis.Logbus/Utils/FastFifoQueue.cs ?revision = 461&安培;图=标记