Producer/consumer pattern with a fixed-size FIFO queue(具有固定大小 FIFO 队列的生产者/消费者模式)
问题描述
I need to implement the producer/consumer pattern around a fixed-size FIFO queue. I think a wrapper class around a ConcurrentQueue might work for this but I'm not completely sure (and I've never worked with a ConcurrentQueue before). The twist in this is that the queue needs to only hold a fixed number of items (strings, in my case). My application will have one producer task/thread and one consumer task/thread. When my consumer task runs, it needs to dequeue all of the items that exist in the queue at that moment in time and process them.
For what it's worth, processing of the queued items by my consumer is nothing more than uploading them via SOAP to a web app that isn't 100% reliable. If the connection can't be established or the call SOAP call fails, I'm supposed to discard those items and go back to the queue for more. Because of the overhead of SOAP, I was trying to maximize the number of items from the queue that I could send in one SOAP call.
At times, my producer may add items faster than my consumer is able to remove and process them. If the queue is already full and my producer needs to add another item, I need to enqueue the new item but then dequeue the oldest item so that the size of the queue remains fixed. Basically, I need to keep the most recent items that are produced in the queue at all time (even if it means some items don't get consumed because my consumer is currently processing previous items).
With regard to the producer keeping the number if items in the queue fixed, I found one potential idea from this question:
Fixed size queue which automatically dequeues old values upon new enques
I'm currently using a wrapper class (based on that answer) around a ConcurrentQueue with an Enqueue() method like this:
public class FixedSizeQueue<T>
{
readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
public int Size { get; private set; }
public FixedSizeQueue(int size)
{
Size = size;
}
public void Enqueue(T obj)
{
// add item to the queue
queue.Enqueue(obj);
lock (this) // lock queue so that queue.Count is reliable
{
while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
{
T objOut;
queue.TryDequeue(out objOut);
}
}
}
}
I create an instance of this class with a size limit on the queue like this:
FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit
I start up my producer task and it begins filling the queue. The code in my Enqueue() method seems to be working properly with regard to removing the oldest item from the queue when adding an item causes the queue count to exceed the max size. Now I need my consumer task to dequeue items and process them but here's where my brain gets confused. What's the best way to implement a Dequeue method for my consumer that will take a snapshot of the queue at a moment in time and dequeue all items for processing (the producer may still be adding items to the queue during this process)?
Simply stated, the ConcurrentQueue has a "ToArray" method which, when entered, will lock the collection and produce a "snapshot" of all current items in the queue. If you want your consumer to be given a block of things to work on, you can lock the same object the enqueueing method has, Call ToArray(), and then spin through a while(!queue.IsEmpty) queue.TryDequeue(out trash)
loop to clear the queue, before returning the array you extracted.
This would be your GetAll()
method:
public T[] GetAll()
{
lock (syncObj) // so that we don't clear items we didn't get with ToArray()
{
var result = queue.ToArray();
T trash;
while(!queue.IsEmpty) queue.TryDequeue(out trash);
}
}
Since you have to clear out the queue, you could simply combine the two operations; create an array of the proper size (using queue.Count), then while the queue is not empty, Dequeue an item and put it in the array, before returning.
Now, that's the answer to the specific question. I must now in good conscience put on my CodeReview.SE hat and point out a few things:
NEVER use
lock(this)
. You never know what other objects may be using your object as a locking focus, and thus would be blocked when the object locks itself from the inside. The best practice is to lock a privately scoped object instance, usually one created just to be locked:private readonly object syncObj = new object();
Since you're locking critical sections of your wrapper anyway, I would use an ordinary
List<T>
instead of a concurrent collection. Access is faster, it's more easily cleaned out, so you'll be able to do what you're doing much more simply than ConcurrentQueue allows. To enqueue, lock the sync object, Insert() before index zero, then remove any items from index Size to the list's current Count using RemoveRange(). To dequeue, lock the same sync object, call myList.ToArray() (from the Linq namespace; does pretty much the same thing as ConcurrentQueue's does) and then call myList.Clear() before returning the array. Couldn't be simpler:public class FixedSizeQueue<T> { private readonly List<T> queue = new List<T>(); private readonly object syncObj = new object(); public int Size { get; private set; } public FixedSizeQueue(int size) { Size = size; } public void Enqueue(T obj) { lock (syncObj) { queue.Insert(0,obj) if(queue.Count > Size) queue.RemoveRange(Size, Count-Size); } } public T[] Dequeue() { lock (syncObj) { var result = queue.ToArray(); queue.Clear(); return result; } } }
You seem to understand that you are throwing enqueued items away using this model. That's usually not a good thing, but I'm willing to give you the benefit of the doubt. However, I will say there is a lossless way to achieve this, using a BlockingCollection. A BlockingCollection wraps any IProducerConsumerCollection including most System.Collections.Concurrent classes, and allows you to specify a maximum capacity for the queue. The collection will then block any thread attempting to dequeue from an empty queue, or any thread attempting to add to a full queue, until items have been added or removed such that there is something to get or room to insert. This is the best way to implement a producer-consumer queue with a maximum size, or one that would otherwise require "polling" to see if there's something for the consumer to work on. If you go this route, only the ones the consumer has to throw away are thrown away; the consumer will see all the rows the producer puts in and makes its own decision about each.
这篇关于具有固定大小 FIFO 队列的生产者/消费者模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:具有固定大小 FIFO 队列的生产者/消费者模式
基础教程推荐
- 如何激活MC67中的红灯 2022-01-01
- SSE 浮点算术是否可重现? 2022-01-01
- 将 XML 转换为通用列表 2022-01-01
- rabbitmq 的 REST API 2022-01-01
- 有没有办法忽略 2GB 文件上传的 maxRequestLength 限制? 2022-01-01
- 如何在 IDE 中获取 Xamarin Studio C# 输出? 2022-01-01
- 将 Office 安装到 Windows 容器 (servercore:ltsc2019) 失败,错误代码为 17002 2022-01-01
- MS Visual Studio .NET 的替代品 2022-01-01
- 为什么Flurl.Http DownloadFileAsync/Http客户端GetAsync需要 2022-09-30
- c# Math.Sqrt 实现 2022-01-01