C#阻塞FIFO队列可以泄漏消息吗?

我正在开发一个学术开源项目,现在我需要在C#中创建一个快速阻塞FIFO队列。我的第一个实现只是在读者的信号量中包含一个同步队列(带有动态扩展),然后我决定以下面的方式重新实现(理论上更快)
public class FastFifoQueue<T>
{
    private T[] _array;
    private int _head, _tail, _count;
    private readonly int _capacity;
    private readonly Semaphore _readSema, _writeSema;

    /// <summary>
    /// Initializes FastFifoQueue with the specified capacity
    /// </summary>
    /// <param name="size">Maximum number of elements to store</param>
    public FastFifoQueue(int size)
    {
        //Check if size is power of 2
        //Credit: http://stackoverflow.com/questions/600293/how-to-check-if-a-number-is-a-power-of-2
        if ((size & (size - 1)) != 0)
            throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work");

        _capacity = size;
        _array = new T[size];
        _count = 0;
        _head = int.MinValue; //0 is the same!
        _tail = int.MinValue;

        _readSema = new Semaphore(0, _capacity);
        _writeSema = new Semaphore(_capacity, _capacity);
    }

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        Interlocked.Exchange(ref _array[index], item);
        Interlocked.Increment(ref _count);
        _readSema.Release();
    }

    public T Dequeue()
    {
        _readSema.WaitOne();
        int index = Interlocked.Increment(ref _tail);
        index %= _capacity;
        if (index < 0) index += _capacity;
        T ret = Interlocked.Exchange(ref _array[index], null);
        Interlocked.Decrement(ref _count);
        _writeSema.Release();

        return ret;
    }

    public int Count
    {
        get
        {
            return _count;
        }
    }
}
这是我们在教科书上找到的静态数组的经典FIFO队列实现。它被设计成以原子方式递增指针,并且因为当达到(capacity-1)时我无法使指针返回到零,所以我计算模数。理论上,使用Interlocked与执行增量之前的锁定相同,并且由于存在信号量,因此多个生产者/消费者可以进入队列,但是一次只能有一个能够修改队列指针。 首先,因为Interlocked.Increment首先递增,然后返回,我已经明白我只能使用后增量值并从数组中的位置1开始存储项目。这不是问题,当我达到一定值时,我会回到0 有什么问题呢? 您不会相信,在重负载下运行,有时队列会返回NULL值。我确定,重复一遍,我确定,没有方法将null排入队列。这绝对是正确的,因为我试图在Enqueue中进行空检查以确保,并且没有抛出任何错误。我用Visual Studio创建了一个测试用例(顺便说一下,我使用像maaaaaaaany人这样的双核CPU)
    private int _errors;

    [TestMethod()]
    public void ConcurrencyTest()
    {
        const int size = 3; //Perform more tests changing it
        _errors = 0;
        IFifoQueue<object> queue = new FastFifoQueue<object>(2048);
        Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
        Thread[] producers = new Thread[size], consumers = new Thread[size];

        for (int i = 0; i < size; i++)
        {
            producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal };
            consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal };
            producers[i].Start(queue);
            consumers[i].Start(queue);
        }

        Thread.Sleep(new TimeSpan(0, 0, 1, 0));

        for (int i = 0; i < size; i++)
        {
            producers[i].Abort();
            consumers[i].Abort();
        }

        Assert.AreEqual(0, _errors);
    }

    private void LoopProducer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                try
                {
                    q.Enqueue(new object());
                }
                catch
                { }

            }
        }
        catch (ThreadAbortException)
        { }
    }

    private void LoopConsumer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                object item = q.Dequeue();
                if (item == null) Interlocked.Increment(ref _errors);
            }
        }
        catch (ThreadAbortException)
        { }

    }
一旦消费者线程获得null,就会计算错误。 当使用1个生产者和1个消费者进行测试时,它会成功。当对2个生产者和2个或更多消费者进行测试时,会发生灾难:甚至检测到2000个泄漏。我发现问题可能出在Enqueue方法中。通过设计契约,生产者只能写入一个空的单元格(null),但是通过一些诊断修改我的代码我发现有时生产者试图在非空单元格上写入,然后被“好”所占据“数据。
    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        T leak = Interlocked.Exchange(ref _array[index], item);

        //Diagnostic code
        if (leak != null)
        {
            throw new InvalidOperationException("Too bad...");
        }
        Interlocked.Increment(ref _count);

        _readSema.Release();
    }
然后经常发生“太糟糕”的异常。但是从并发写入引发冲突太奇怪了,因为增量是原子的,而编写器的信号量只允许与自由数组单元一样多的编写器。 有人可以帮我吗?如果您与我分享您的技能和经验,我将非常感激。 谢谢。     
已邀请:
我必须说,这让我觉得这是一个非常聪明的主意,在我开始意识到这里的错误(我认为)之前,我想了一会儿。所以,一方面,我们想出这样一个聪明的设计!但是,与此同时,对你展示“Kernighan定律”感到羞耻:   调试的难度是首先编写代码的两倍。因此,如果您尽可能巧妙地编写代码,那么根据定义,您不够聪明,无法对其进行调试。 问题基本上是这样的:你假设
WaitOne
Release
有效地序列化你的所有
Enqueue
Dequeue
操作;但这并不是这里发生的事情。请记住,
Semaphore
类用于限制访问资源的线程数,而不是确保特定的事件顺序。每个
WaitOne
Release
之间发生的事情不能保证在与
WaitOne
Release
自身相同的“线程顺序”中发生。 用文字解释这很棘手,所以让我试着提供一个视觉插图。 假设您的队列容量为8,看起来像这样(让
0
代表
null
x
代表一个对象): [x x x x x x x x] 所以
Enqueue
被调用了8次并且队列已满。因此,你的
_writeSema
信号量将阻止
WaitOne
,你的
_readSema
信号量将立即返回
WaitOne
。 现在让我们假设在6个不同的线程上或多或少同时调用
Dequeue
。我们称之为T1,T2和T3。 在继续之前,让我为你的
Dequeue
实施申请一些标签,以供参考:
public T Dequeue()
{
    _readSema.WaitOne();                                   // A
    int index = Interlocked.Increment(ref _tail);          // B
    index %= _capacity;
    if (index < 0) index += _capacity;
    T ret = Interlocked.Exchange(ref _array[index], null); // C
    Interlocked.Decrement(ref _count);
    _writeSema.Release();                                  // D

    return ret;
}
好的,所以T1,T2和T3都已经过了A点。那么为了简单起见,我们假设它们各自按顺序到达B行,这样T1的
index
为0,T2的
index
为1,T3有
index
of2。 到现在为止还挺好。但是这里有问题:无法保证从这里开始,T1,T2和T3将以任何指定的顺序到达D行。假设T3实际上超过了T1和T2,超过了C线(从而将
_array[2]
设置为
null
)并一直到达D线。 在此之后,将发出
_writeSema
信号,这意味着您的队列中有一个可用的插槽可以写入,对吗?但你的队列现在看起来像这样! [x x 0 x x x x x] 因此,如果另一个线程同时调用
Enqueue
,它实际上会超过
_writeSema.WaitOne
,增加
_head
,得到
index
为0,即使插槽0不为空。这样做的结果是插槽0中的项目实际上可以被覆盖,在T1之前(还记得吗?)读取它。 要了解
null
值的来源,您只需要可视化我刚才描述的过程的反向。也就是说,假设您的队列如下所示: [0 0 0 0 0 0 0 0] 三个线程,T1,T2和T3,几乎同时调用
Enqueue
。 T3最后增加
_head
但是插入其项目(在
_array[2]
)并首先调用
_readSema.Release
,导致发出信号
_readSema
但是队列看起来像: [0 0 x 0 0 0 0 0] 因此,如果另一个线程同时调用了
Dequeue
(在T1和T2完成它们的事情之前),它将超过
_readSema.WaitOne
,增加
_tail
,得到
index
为0,即使插槽0为空。 所以这是你的问题。至于解决方案,我目前没有任何建议。给我一些时间考虑一下......(我现在正在发布这个答案,因为它在我的脑海中很新鲜,我觉得它可能对你有帮助。)     
(我投票给丹涛的+1有答案) 入队将改为这样的......
while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
    ;
出列队将改为这样的......
while( (ret = Interlocked.Exchange(ref _array[index], null)) == null)
    ;
这是基于丹涛的出色分析。因为索引是以原子方式获得的,所以(假设没有线程在排队或出队方法中死亡或终止),保证读者最终填充其单元格,或者保证作者最终释放其单元格(null)。     
谢谢Dan Tao和Les, 我非常感谢你的帮助。丹,你打开了我的想法:在关键部分内有多少生产者/消费者并不重要,重要的是锁是按顺序释放的。莱斯,你找到了问题的解决方案。 现在是时候终于用我们所做的最终代码回答我自己的问题了,感谢你们两位的帮助。嗯,它并不多,但它是Les的代码的一点点增强 排队:
while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
            Thread.Sleep(0);
出列:
while ((ret = Interlocked.Exchange(ref _array[index], null)) == null)
            Thread.Sleep(0);
为什么Thread.Sleep(0)?当我们发现无法检索/存储元素时,为什么要立即再次检查?我需要强制上下文切换以允许其他线程读/写。显然,将要安排的下一个线程可能是另一个无法操作的线程,但至少我们强制它。资料来源:http://progfeatures.blogspot.com/2009/05/how-to-force-thread-to-perform-context.html 我还测试了之前测试用例的代码,以获得我的声明的证据: 不睡觉(0)
Read 6164150 elements
Wrote 6322541 elements
Read 5885192 elements
Wrote 5785144 elements
Wrote 6439924 elements
Read 6497471 elements
睡觉(0)
Wrote 7135907 elements
Read 6361996 elements
Wrote 6761158 elements
Read 6203202 elements
Wrote 5257581 elements
Read 6587568 elements
我知道这不是一个“伟大的”发现,我将不会为这些数字赢得图灵奖。性能增量不是很明显,但大于零。强制上下文切换允许执行更多RW操作(=更高的吞吐量)。 需要明确的是:在我的测试中,我只是评估队列的性能,而不是模拟生产者/消费者的问题,所以不要在乎测试结束后一分钟内是否还有队列中的元素。但我只是证明我的方法有效,多亏你们所有人。 代码可用开源为MS-RL:http://logbus-ng.svn.sourceforge.net/viewvc/logbus-ng/trunk/logbus-core/It.Unina.Dis.Logbus/Utils/FastFifoQueue.cs ?revision = 461&安培;图=标记     

要回复问题请先登录注册