Hi, I now design three route to process my input message and want to
concatenate them using my aggregation strategy.Here is my design:
payload ->route1:"direct:start", do multicast to route 2("direct:a") and
route 3("direct:b")
in route 2("direct:a"), do multicast to 8 servers, and use
JoinReplyAggregationStrategy to concatenate the result, lets call the result
as route2result and sent this result to route 3("direct:b")
in route 3("direct:b"), now have two inputs, the original payload from route
1 and the result from route2.
I also want to use the same JoinReplyAggregationStrategy to concatenate
these two parts, but it give me an exception and the output is only the
original payload from route1("direct:start").
Here is the exception:
Caused by: java.lang.NullPointerException: null
at
mengt.camel.JoinReplyAggregationStrategy.aggregate(JoinReplyAggregationStrategy.java:19)
~[main/:na]
at
org.apache.camel.processor.aggregate.AggregateProcessor.onAggregation(AggregateProcessor.java:629)
~[camel-core-2.16.1.jar:2.16.1]
at
org.apache.camel.processor.aggregate.AggregateProcessor.doAggregation(AggregateProcessor.java:447)
~[camel-core-2.16.1.jar:2.16.1]
... 29 common frames omitted
Here is the code of how to implement my route and aggregation Strategy:
public void configure() throws Exception{
from("direct:start")
.marshal()
.string(UTF_8)
.multicast()
.parallelProcessing()
.to("direct:route1")
.to("direct:route2")
;
from("direct:route1")
.multicast()
.parallelProcessing()
.aggregationStrategy(new JoinReplyAggregationStrategy())
.to(TARGET1, TARGET2, TARGET3, TARGET4, TARGET5, TARGET6,
TARGET7, TARGET8)
//.to("log:?level=INFO&showBody=true")
.timeout(100000)
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception
{
String input =
exchange.getIn().getBody(String.class).replaceAll("}\\+\\{", ",");
exchange.getOut().setBody(input);
System.out.println(input);
}
})
.to("stream:out")
// .to("direct:b")
.to("direct:route2");
from("direct:route2")
.aggregate(constant(true), new JoinReplyAggregationStrategy())
.completionTimeout(500L)
.to("stream:out")
public class JoinReplyAggregationStrategy implements AggregationStrategy{
public Exchange aggregate(Exchange exchangeOld, Exchange exchangeNew){
//String body1 = null;
//static Logger logger = Logger.getLogger(exchangeOld.getIn());
if(exchangeOld == null){
return exchangeNew;
}else {
String body1 = exchangeOld.getIn().getBody(String.class);
String body2 = exchangeNew.getIn().getBody(String.class);
String status = "Http Status " +
exchangeOld.getIn().getHeader("CamelHttpResponseCode").toString();
System.out.println(status);
String merged = (body1 == null) ? body2 : body1 + "+" +body2;
exchangeOld.getIn().setBody(merged);
//System.out.println(merged);
return exchangeOld;
}
}
}
Can I use the aggregation Strategy to concatenate the input from other two
routes directly like this? Or my understanding of aggregation is wrong?
Thanks,
Meng
--
View this message in context:
http://camel.465427.n5.nabble.com/Problem-Concatenate-the-different-input-from-different-route-using-aggreagationStrategy-tp5789368.html
Sent from the Camel - Users mailing list archive at Nabble.com.