等待基于任务的队列

awaitable Task based queue(等待基于任务的队列)

本文介绍了等待基于任务的队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道是否存在 ConcurrentQueue,类似于 BlockingCollection 从集合中获取不会阻塞,但是相反,它是异步的,并且会导致异步等待,直到将项目放入队列中.

I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.

我已经提出了自己的实现,但它似乎没有按预期执行.我想知道我是否正在重新发明已经存在的东西.

I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.

这是我的实现:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}

推荐答案

不知道有没有无锁解决方案,不过你可以看看新的数据流库,异步 CTP.一个简单的 BufferBlock<T> 就足够了,例如:

I don't know of a lock-free solution, but you can take a look at the new Dataflow library, part of the Async CTP. A simple BufferBlock<T> should suffice, e.g.:

BufferBlock<int> buffer = new BufferBlock<int>();

生产和消费最容易通过数据流块类型的扩展方法完成.

Production and consumption are most easily done via extension methods on the dataflow block types.

制作很简单:

buffer.Post(13);

并且消费是异步就绪的:

and consumption is async-ready:

int item = await buffer.ReceiveAsync();

如果可能,我建议您使用 Dataflow;使这样的缓冲区既有效又正确比最初看起来要困难得多.

I do recommend you use Dataflow if possible; making such a buffer both efficient and correct is more difficult than it first appears.

这篇关于等待基于任务的队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:等待基于任务的队列

基础教程推荐