How do I implement my own advanced Producer/Consumer scenario?(如何实现我自己的高级生产者/消费者场景?)
问题描述
注意:
我对我的问题进行了彻底的修改.您可以通过更改历史记录查看原始问题.
NOTE:
i did a complete rework of my question. you can see the original question via the change-history.
我需要一个强大"的人.队列,提供以下功能:
i'm in the need of a "mighty" queue, which provides following functionalities:
- 我对一组对象有一定的范围.这意味着 A 组、B 组、...将有自己的队列
- 我正在组范围线程中填充队列线程 A(生产者)
- 我正在读取组范围线程中的队列线程 B(消费者)
- i have a certain scope for a group of objects. that means that Group A, Group B, ... will have their own queue
- i'm filling a queue in a group-scoped thread Thread A (Producer)
- i'm reading a queue in a group-scoped thread Thread B (Consumer)
所以我会有以下场景:
- 队列中没有任何项目(因为作业是用空的目标组"调用的):线程 B 应该退出循环
- 队列中当前没有项目,因为线程 A 正在处理要排队的项目:线程 B 应该等待
- 队列中有项目:线程 B 应该能够出列并处理项目
- 队列中没有项目,因为 线程 A 没有更多的项目要入队:线程 B 应该退出循环
- there is and will be no item in the queue (as the jobs were called with an empty "targetgroup"): Thread B should escape the loop
- there is currently no item in the queue, as Thread A is working on the item to enqueue: Thread B should wait
- there are items in the queue: Thread B should be able to dequeue and process the item
- there is no item in the queue, as Thread A has no more items to enqueue: Thread B should escape the loop
现在我想出了以下实现:
now i came up with following implementation:
public class MightyQueue<T>
where T : class
{
private readonly Queue<T> _queue = new Queue<T>();
private bool? _runable;
private volatile bool _completed;
public bool Runable
{
get
{
while (!this._runable.HasValue)
{
Thread.Sleep(100);
}
return this._runable ?? false;
}
set
{
this._runable = value;
}
}
public void Enqueue(T item)
{
if (item == null)
{
throw new ArgumentNullException("item");
}
this._queue.Enqueue(item);
}
public void CompleteAdding()
{
this._completed = true;
}
public bool TryDequeue(out T item)
{
if (!this.Runable)
{
item = null;
return false;
}
while (this._queue.Count == 0)
{
if (this._completed)
{
item = null;
return false;
}
Thread.Sleep(100);
}
item = this._queue.Dequeue();
return true;
}
}
然后会被使用
制片人
if (anythingToWorkOn)
{
myFooMightyQueueInstance.Runable = false;
}
else
{
myFooMightyQueueInstance.Runable = true;
while (condition)
{
myFooMightyQueueInstance.Enqueue(item);
}
myFooMightyQueueInstance.CompleteAdding();
}
消费者
if (!myFooMightyQueueInstance.Runable)
{
return;
}
T item;
while (myFooMightyQueueInstance.TryDequeue(out item))
{
//work with item
}
但我相信,这种方法是错误的,因为我在那里使用了一些 Thread.Sleep()
-stuff(不能有一些 waitHandle 或其他东西吗?)...我也不是关于算法本身......谁能帮帮我?
but i believe, that this approach is wrong, as i'm using some Thread.Sleep()
-stuff in there (can't be there some waitHandle or something else?)... i'm not about the algo itself either ...
can anyone please help me out?
推荐答案
你应该从一个通用的生产者-消费者队列开始并使用它.在队列中实现这一点并不是一个好主意,因为这会阻止您使用信号量来向线程发出信号(或者,您可以在队列中使用公共信号量,但这是一个真的坏主意).
You should begin with a generic Producer-Consumer queue and use that. Implementing this inside a Queue is not such a good idea, as this prevents you from using semaphores to signal threads (or, you could have public semaphores in your Queue, but that's a really bad idea).
一旦线程 A 将单个工作项入队,它必须发出信号量通知线程 B.当线程 B 完成处理所有项时,它应该发出信号量通知其他所有人它已完成.您的主线程应该等待第二个信号量知道一切都已完成.
As soon as the thread A has enqueued a single work item, it must signal a semaphore to notify thread B. When thread B has finished processing all items, it should signal a semaphore to notify everyone else that it has finished. Your main thread should be waiting for this second semaphore to know that everything is done.
首先,你有一个生产者和一个消费者:
First, you have a producer and a consumer:
public interface IProducer<T> : IStoppable
{
/// <summary>
/// Notifies clients when a new item is produced.
/// </summary>
event EventHandler<ProducedItemEventArgs<T>> ItemProduced;
}
public interface IConsumer<T> : IStoppable
{
/// <summary>
/// Performs processing of the specified item.
/// </summary>
/// <param name="item">The item.</param>
void ConsumeItem(T item);
}
public interface IStoppable
{
void Stop();
}
因此,在您的情况下,创建邮件的类需要触发 ItemProduced
事件,发送邮件的类需要实现 ConsumeItem
.
So, in your case, class creating the mail will need to fire the ItemProduced
event, and the class sending it will need to implement ConsumeItem
.
然后你将这两个实例传递给 Worker
的一个实例:
And then you pass these two instances to an instance of Worker
:
public class Worker<T>
{
private readonly Object _lock = new Object();
private readonly Queue<T> _queuedItems = new Queue<T>();
private readonly AutoResetEvent _itemReadyEvt = new AutoResetEvent(false);
private readonly IProducer<T> _producer;
private readonly IConsumer<T> _consumer;
private volatile bool _ending = false;
private Thread _workerThread;
public Worker(IProducer<T> producer, IConsumer<T> consumer)
{
_producer = producer;
_consumer = consumer;
}
public void Start(ThreadPriority priority)
{
_producer.ItemProduced += Producer_ItemProduced;
_ending = false;
// start a new thread
_workerThread = new Thread(new ThreadStart(WorkerLoop));
_workerThread.IsBackground = true;
_workerThread.Priority = priority;
_workerThread.Start();
}
public void Stop()
{
_producer.ItemProduced -= Producer_ItemProduced;
_ending = true;
// signal the consumer, in case it is idle
_itemReadyEvt.Set();
_workerThread.Join();
}
private void Producer_ItemProduced
(object sender, ProducedItemEventArgs<T> e)
{
lock (_lock) { _queuedItems.Enqueue(e.Item); }
// notify consumer thread
_itemReadyEvt.Set();
}
private void WorkerLoop()
{
while (!_ending)
{
_itemReadyEvt.WaitOne(-1, false);
T singleItem = default(T);
lock (_lock)
{
if (_queuedItems.Count > 0)
{
singleItem = _queuedItems.Dequeue();
}
}
while (singleItem != null)
{
try
{
_consumer.ConsumeItem(singleItem);
}
catch (Exception ex)
{
// handle exception, fire an event
// or something. Otherwise this
// worker thread will die and you
// will have no idea what happened
}
lock (_lock)
{
if (_queuedItems.Count > 0)
{
singleItem = _queuedItems.Dequeue();
}
}
}
}
} // WorkerLoop
} // Worker
这是大体思路,可能还需要一些额外的调整.
That's the general idea, there may be some additional tweaks needed.
要使用它,你需要让你的类实现这两个接口:
To use it, you need to have your classes implement these two interfaces:
IProducer<IMail> mailCreator = new MailCreator();
IConsumer<IMail> mailSender = new MailSender();
Worker<IMail> worker = new Worker<IMail>(mailCreator, mailSender);
worker.Start();
// produce an item - worker will add it to the
// queue and signal the background thread
mailCreator.CreateSomeMail();
// following line will block this (calling) thread
// until all items are consumed
worker.Stop();
这样做的好处在于:
- 您可以拥有任意数量的工人
- 多个工人可以接受来自同一生产者的物品
- 多个工作人员可以将项目分派给同一个消费者(尽管这意味着您需要考虑消费者是以线程安全的方式实现的)
这篇关于如何实现我自己的高级生产者/消费者场景?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何实现我自己的高级生产者/消费者场景?
基础教程推荐
- 有没有办法忽略 2GB 文件上传的 maxRequestLength 限制? 2022-01-01
- 将 XML 转换为通用列表 2022-01-01
- SSE 浮点算术是否可重现? 2022-01-01
- c# Math.Sqrt 实现 2022-01-01
- rabbitmq 的 REST API 2022-01-01
- 为什么Flurl.Http DownloadFileAsync/Http客户端GetAsync需要 2022-09-30
- 如何激活MC67中的红灯 2022-01-01
- MS Visual Studio .NET 的替代品 2022-01-01
- 如何在 IDE 中获取 Xamarin Studio C# 输出? 2022-01-01
- 将 Office 安装到 Windows 容器 (servercore:ltsc2019) 失败,错误代码为 17002 2022-01-01