Camel 2.11 批量聚合如何与单独的路由一起使用?

How Camel 2.11 batch aggregation works with separate route?(Camel 2.11 批量聚合如何与单独的路由一起使用?)

本文介绍了Camel 2.11 批量聚合如何与单独的路由一起使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!



First there is a similar unanswered question Joining routes into single aggregator


We have some consumer routes (ftp, file, smb) reading files from remote systems. Simplified for test with direct route, but similar behavior with batch consumers:

 .setProperty(AGGREGATION_PROPERTY, constant(routeId))
 .log(String.format("Sending (${body}) to %s", "direct:start1"))


After transformation all results from one poll are aggregated by batch in a separate route:

  .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
  .to("log:result", "mock:result");

如果每个消费者分开运行,一切正常.但如果多个消费者并行运行,聚合将拆分民意调查.例如,如果文件消费者轮询 500 条消息,并且第二条路由开始从 ftp 读取 6 个文件,则期望我们得到 2 个聚合,1 个来自文件的 500 条消息,1 个来自 ftp 的 6 个消息.

All works fine, if every consumer runs separated. But if multiple consumers runs in parallel, aggregation will split the polls. Example if file-consumer polls 500 messages and a second route starts to read 6 files from ftp the expections is that we get 2 aggregates 1 with 500 messages from file and 1 with 6 messages from ftp.


public void testAggregateByProperty() throws Exception {
    MockEndpoint result =  getMockEndpoint("mock:result");

    result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");

    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
    template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);



The result is: "A+A", "B", "A", "B", "A" and not the expected "A+A+A", "B+B", "A", "Z". Questions:

  1. 我们对聚合的假设是否错误?
  2. 我们如何才能实现预期的行为?
  3. 如果我们设置了completionTimeout,它接缝会从第一次交换发生超时 - 如果还有新的交换,则独立?



You almost have it working. Here is the change you need (and after I will explain).

    .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy())
    .to("log:result", "mock:result")


Exchange received, body: A+A+A
Exchange received, body: B+B
Exchange received, body: A

注意:您不会收到 "Z" 的结果,因为批量大小为 7.

Note: You won't receive a result for the "Z" since the batch size is 7.

解释一下 - 正如您所读到的,聚合器是一个多功能的骆驼组件,正确定义的关键是:

To explain - as you have read, the Aggregator is a versatile camel component and the key things to define correctly are:

  • 聚合表达式
  • 补全规则

现在,在您的情况下,您正在聚合一个属性 AGGREGATION_PROPERTY,它将是 ABZ.此外,您正在指定批量大小.

Now in your case you are aggregating on a property AGGREGATION_PROPERTY which will be A, B or Z. In addition you are specifying a batch size.

但是,您没有在路线中表达 completionSize().相反,您使用的是 completionFromBatchConsumer - 它做了一些不同的事情(代码声明它查找 Exchange#BATCH_COMPLETE 属性),因此结果很奇怪.

However you aren't expressing a completionSize() in your route. Instead you were using completionFromBatchConsumer - which does something different (the code states that it looks for a Exchange#BATCH_COMPLETE property), thus the weird results.

无论如何,.completionSize(Exchange.BATCH_SIZE) 将使您的测试按需要运行.

Anyway, .completionSize(Exchange.BATCH_SIZE) will make your test run as desired.


这篇关于Camel 2.11 批量聚合如何与单独的路由一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:Camel 2.11 批量聚合如何与单独的路由一起使用?
