Batching on duration or threshold using TPL Dataflow(使用TPL数据流对持续时间或阈值进行批处理)
问题描述
我已经使用TPL数据流实现了生产者..消费者模式。用例是代码从Kafka总线读取消息。为提高效率,我们需要在进入数据库时批量处理消息。
TPL数据流中是否有办法在达到大小或持续时间阈值时保留消息并触发?
例如,一旦消息从队列中拉出,当前实现将发布该消息。
postedSuccessfully = targetBuffer.Post(msg.Value);
推荐答案
已经可以通过System.Reactive特别是Buffer运算符按计数和持续时间进行缓冲。缓冲区收集传入事件,直到达到所需计数或其时间跨度过期。
数据流数据块旨在与System.Reactive配合使用。使用DataflowBlock.AsObservable()和AsObserver()扩展方法将can be converted块传递给可观测对象和观察者。这使得构建缓冲挡路变得非常容易:
public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
var inBlock = new BufferBlock<TIn>();
var outBlock = new BufferBlock<IList<TIn>>();
var outObserver=outBlock.AsObserver();
inBlock.AsObservable()
.Buffer(timeSpan, count)
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(outObserver);
return DataflowBlock.Encapsulate(inBlock, outBlock);
}
此方法使用两个缓冲块来缓冲输入和输出。Buffer()
在批处理已满或时间跨度过期时,从输入挡路(可观察对象)读取并写入输出挡路(观察者)。
默认情况下,Rx在当前线程上工作。通过调用ObserveOn(TaskPoolScheduler.Default)
,我们告诉它在任务池线程上处理数据。
示例
此代码创建5个项目或1秒的挡路缓冲区。它从发布7个项目开始,等待1.1秒,然后发布另外7个项目。每批都与线程ID一起写入控制台:
static async Task Main(string[] args)
{
//Build the pipeline
var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);
var options = new DataflowLinkOptions { PropagateCompletion = true };
var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
bufferBlock.LinkTo(printBlock, options);
//Start the messages
Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");
for (int i=0;i<7;i++)
{
bufferBlock.Post(i.ToString());
}
await Task.Delay(1100);
for (int i=7; i < 14; i++)
{
bufferBlock.Post(i.ToString());
}
bufferBlock.Complete();
Console.WriteLine($"Finishing");
await bufferBlock.Completion;
Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
Console.ReadKey();
}
static void printOut(IEnumerable<string> items)
{
var line = String.Join(",", items);
Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}
输出为:
Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6
这篇关于使用TPL数据流对持续时间或阈值进行批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:使用TPL数据流对持续时间或阈值进行批处理
基础教程推荐
- 如何激活MC67中的红灯 2022-01-01
- 有没有办法忽略 2GB 文件上传的 maxRequestLength 限制? 2022-01-01
- 如何在 IDE 中获取 Xamarin Studio C# 输出? 2022-01-01
- rabbitmq 的 REST API 2022-01-01
- SSE 浮点算术是否可重现? 2022-01-01
- 为什么Flurl.Http DownloadFileAsync/Http客户端GetAsync需要 2022-09-30
- 将 XML 转换为通用列表 2022-01-01
- c# Math.Sqrt 实现 2022-01-01
- MS Visual Studio .NET 的替代品 2022-01-01
- 将 Office 安装到 Windows 容器 (servercore:ltsc2019) 失败,错误代码为 17002 2022-01-01