Apache Camel - Split and aggregate - Old Exchange is always null(Apache Camel - 拆分和聚合 - 旧 Exchange 始终为空)
问题描述
我看到这个问题已经被问过很多次了,但没有一个帖子有帮助,也没有一个决定性的解决方案.我正在拆分一条消息,然后使用 Aggregator2 对其进行聚合.代码抛出异常,因为 oldExchange 始终为空.所以为了测试我设计了一个小代码.
I see that this question has been asked a number of times but none of post helped or had a conclusive solution. I am splitting a message and then aggregating it using Aggregator2. The code was throwing exception because oldExchange was always null. So to test I designed a small code.
我读了一个订单,xml文件,看起来像这样
I read an orders,xml file which looks like this
<Orders xmlns="http://some/schema/Order">
<Order>
<orderNum>1</orderNum>
</Order>
<Order>
<orderNum>2</orderNum>
</Order>
<Order>
<orderNum>3</orderNum>
</Order>
<Order>
<orderNum>5</orderNum>
</Order>
<Order>
<orderNum>6</orderNum>
</Order>
我的骆驼上下文看起来像这样
My camel Context Looks like this
<camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split>
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:logQueries"/>
<camel:to uri="direct:aggegateQueries"/>
</camel:split>
</camel:route>
<camel:route>
<camel:from uri="direct:logQueries"/>
<camel:log message="After the call :
${body}"></camel:log>
</camel:route>
<camel:route>
<camel:from uri="direct:aggegateQueries"/>
<camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
<camel:correlationExpression>
<camel:xpath>//te:Order</camel:xpath>
</camel:correlationExpression>
<camel:to uri="file:src/data/catask/output?fileName=output.xml"/>
</camel:aggregate>
</camel:route>
我的聚合策略类看起来像这样
My Aggregation Strategy class looks like this
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
System.out.println("Returning new exchange");
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(oldBody + "+" + newBody);
return oldExchange;
}
问题在于,当聚合结果保存在 output.xml 文件中时,它只包含从 Orders.xml 读取的最后一条记录.
The problem is that when the aggregated result is saved in output.xml file it contains only the last record it read from Orders.xml.
即
<Order xmlns="http://some/schema/Order">
<orderNum>6</orderNum>
</Order>
我进一步调查并发现这是因为在第一次调用之后 oldExchange 应该有一些值,但事实证明它始终为空.我认为因为它是从单个文件中读取所有内容并将其拆分,所以只有交换.
I looked into it further and found that this was happening because after the first call oldExchange should have some value but it turns out it is always null. I think that because it is reading everything from a single file and splitting it, there is only exchange.
>有什么建议吗?
更新 1: Per Claus 我只能使用 Splitter 来解决这个问题.我这样做了,并且能够成功加入所有消息.但是我仍在寻找一种使用 Aggregator2 的方法.这里我是如何只使用 Splitter 的.
UPDATE 1: Per Claus I can use Splitter only to solve this issue. I did that and was able to successfully join all the messages. However I am still looking for a way to use Aggregator2. Here how I did it using Splitter only.
camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split strategyRef="aggrTask">
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:logQueries"/>
<
</camel:split>
</camel:route>
<camel:route>
<camel:from uri="direct:logQueries"/>
<camel:log message="After the call :
${body}"></camel:log>
</camel:route>
推荐答案
我想我知道如何使用 Aggregator 聚合消息了.我添加了一个名为 id 的 headerName 并将其用作我的相关 id.
I think I figured it out how to aggregate the messages using Aggregator. I added a headerName called id and use it as my correlation id.
<camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split>
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:addHeaders"/>
<camel:to uri="direct:aggegateQueries"/>
</camel:split>
</camel:route>
<camel:route>
<camel:from uri="direct:addHeaders"/>
<camel:setHeader headerName="id">
<camel:constant>order</camel:constant>
</camel:setHeader>
</camel:route>
<camel:route>
<camel:from uri="direct:aggegateQueries"/>
<camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
<camel:correlationExpression>
<simple>header.id</simple>
</camel:correlationExpression>
<camel:to uri="file:src/data/catask/output?fileName=output.xml"/>
<camel:log message="MERGED:: /n ${body}"></camel:log>
</camel:aggregate>
</camel:route>
这会汇总我的消息.但是我仍然不确定尽管使用了正确的 XPATH,为什么 Camel 认为它是不同类型的消息?
This aggregates my messages. However I am still not sure that despite using correct XPATH why does Camel thinks that it is different type of message?
从骆驼论坛复制克劳斯的解释:看起来你的相关表达式是一个新的组每个消息,例如每个 xpath 结果都是不同的.如果您想拆分和加入相同的消息,请查看此 eiphttp://camel.apache.org/composed-message-processor.html并查看仅使用拆分器的示例.这更容易做到."
COPYING CLAUS's explanation from camel forums: "Looks like its your correlation expression that is a new group for each message, eg each xpath result is different. If you want to split and join the same messages then see this eip http://camel.apache.org/composed-message-processor.html And see the example using only splitter. That is much easier to do. "
我使用 Xpath Evaluator 工具测试了 Xpath 表达式,还打印了相关表达式结果,我的所有带有//Order 的消息都是相同的.前-
I tested the Xpath expression using a Xpath Evaluator tool and also printed out the coorelation expression results and all my messages with //Order are same. Ex-
Group 1:
<Order>
<orderNum>1</orderNum>
</Order>
Group 2:
<Order>
<orderNum>2</orderNum>
</Order>
这篇关于Apache Camel - 拆分和聚合 - 旧 Exchange 始终为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Apache Camel - 拆分和聚合 - 旧 Exchange 始终为空
基础教程推荐
- “未找到匹配项"使用 matcher 的 group 方法时 2022-01-01
- 在 Libgdx 中处理屏幕的正确方法 2022-01-01
- 减少 JVM 暂停时间 >1 秒使用 UseConcMarkSweepGC 2022-01-01
- 降序排序:Java Map 2022-01-01
- Java Keytool 导入证书后出错,"keytool error: java.io.FileNotFoundException &拒绝访问" 2022-01-01
- FirebaseListAdapter 不推送聊天应用程序的单个项目 - Firebase-Ui 3.1 2022-01-01
- 如何使用 Java 创建 X509 证书? 2022-01-01
- Java:带有char数组的println给出乱码 2022-01-01
- 无法使用修饰符“public final"访问 java.util.Ha 2022-01-01
- 设置 bean 时出现 Nullpointerexception 2022-01-01