How Camel 2.11 batch aggregation works with separate route?(Camel 2.11 批量聚合如何与单独的路由一起使用?)
问题描述
首先有一个类似的未回答问题将路由加入单个聚合器
First there is a similar unanswered question Joining routes into single aggregator
我们有一些消费者路由(ftp、file、smb)从远程系统读取文件.简化了直接路由的测试,但与批处理消费者的行为相似:
We have some consumer routes (ftp, file, smb) reading files from remote systems. Simplified for test with direct route, but similar behavior with batch consumers:
from("direct:"+routeId).id(routeId)
.setProperty(AGGREGATION_PROPERTY, constant(routeId))
.log(String.format("Sending (${body}) to %s", "direct:start1"))
.to("direct:aggregate");
转换后,一次投票的所有结果将在单独的路由中按批次聚合:
After transformation all results from one poll are aggregated by batch in a separate route:
from("direct:aggregate")
.aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
.completionFromBatchConsumer()
.to("log:result", "mock:result");
如果每个消费者分开运行,一切正常.但如果多个消费者并行运行,聚合将拆分民意调查.例如,如果文件消费者轮询 500 条消息,并且第二条路由开始从 ftp 读取 6 个文件,则期望我们得到 2 个聚合,1 个来自文件的 500 条消息,1 个来自 ftp 的 6 个消息.
All works fine, if every consumer runs separated. But if multiple consumers runs in parallel, aggregation will split the polls. Example if file-consumer polls 500 messages and a second route starts to read 6 files from ftp the expections is that we get 2 aggregates 1 with 500 messages from file and 1 with 6 messages from ftp.
测试用例:
public void testAggregateByProperty() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);
assertMockEndpointsSatisfied();
}
结果是:A+A"、B"、A"、B"、A",而不是预期的A+A+A"、B+B"、A",Z".问题:
The result is: "A+A", "B", "A", "B", "A" and not the expected "A+A+A", "B+B", "A", "Z". Questions:
- 我们对聚合的假设是否错误?
- 我们如何才能实现预期的行为?
- 如果我们设置了completionTimeout,它接缝会从第一次交换发生超时 - 如果还有新的交换,则独立?
推荐答案
你几乎可以正常工作了.这是您需要的更改(稍后我会解释).
You almost have it working. Here is the change you need (and after I will explain).
from("direct:aggregate").id("aggregate")
.aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy())
.completionSize(property(Exchange.BATCH_SIZE))
.to("log:result", "mock:result")
结果将是:
Exchange received, body: A+A+A
Exchange received, body: B+B
Exchange received, body: A
注意:您不会收到 "Z"
的结果,因为批量大小为 7
.
Note: You won't receive a result for the "Z"
since the batch size is 7
.
解释一下 - 正如您所读到的,聚合器是一个多功能的骆驼组件,正确定义的关键是:
To explain - as you have read, the Aggregator is a versatile camel component and the key things to define correctly are:
- 聚合表达式
- 补全规则
现在,在您的情况下,您正在聚合一个属性 AGGREGATION_PROPERTY
,它将是 A
、B
或 Z
.此外,您正在指定批量大小.
Now in your case you are aggregating on a property AGGREGATION_PROPERTY
which will be A
, B
or Z
. In addition you are specifying a batch size.
但是,您没有在路线中表达 completionSize()
.相反,您使用的是 completionFromBatchConsumer
- 它做了一些不同的事情(代码声明它查找 Exchange#BATCH_COMPLETE
属性),因此结果很奇怪.
However you aren't expressing a completionSize()
in your route. Instead you were using completionFromBatchConsumer
- which does something different (the code states that it looks for a Exchange#BATCH_COMPLETE
property), thus the weird results.
无论如何,.completionSize(Exchange.BATCH_SIZE)
将使您的测试按需要运行.
Anyway, .completionSize(Exchange.BATCH_SIZE)
will make your test run as desired.
祝你好运.
这篇关于Camel 2.11 批量聚合如何与单独的路由一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Camel 2.11 批量聚合如何与单独的路由一起使用?
基础教程推荐
- Java:带有char数组的println给出乱码 2022-01-01
- 减少 JVM 暂停时间 >1 秒使用 UseConcMarkSweepGC 2022-01-01
- 降序排序:Java Map 2022-01-01
- 在 Libgdx 中处理屏幕的正确方法 2022-01-01
- 无法使用修饰符“public final"访问 java.util.Ha 2022-01-01
- FirebaseListAdapter 不推送聊天应用程序的单个项目 - Firebase-Ui 3.1 2022-01-01
- Java Keytool 导入证书后出错,"keytool error: java.io.FileNotFoundException &拒绝访问" 2022-01-01
- “未找到匹配项"使用 matcher 的 group 方法时 2022-01-01
- 设置 bean 时出现 Nullpointerexception 2022-01-01
- 如何使用 Java 创建 X509 证书? 2022-01-01