Try this:
In your first processor, you are setting the "out" body, which will strip
off any other exchange values like headers. Change this to
getIn().setBody(input);
Next, change your first route to the following:
from("direct:start")
.setHeader("myCorrelationId", simple("${random(1000)}",
Integer.class))
.multicast()
.to("direct:route1", "direct:route2")
.end()
Then, where you want to aggregate, change the route to:
from("direct:route2")
.aggregate(header("myCorrelationId", new
JoinReplyAggregationStrategy()).
completionPredicate(header("aggregated").isEqualTo(2)))
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception
{
System.out.println("################");
System.out.println(exchange.getIn().getBody(String.class));
}
});
I didn't compile this, so make sure that there are the correct number of
parentheses, etc.
On Sat, Oct 29, 2016 at 7:59 AM, Steve973 <[email protected]> wrote:
> Yes. You need to set a header on message 1 and message 2 and use it as the
> aggregation id. The value of this header should be the same value for both
> messages of the pair, but different pairs should have unique ids.
>
> On Oct 29, 2016 4:07 AM, "meng" <[email protected]> wrote:
>
>> Hi Steve,
>>
>> Thanks for replying.
>> I made some changes, here is the code:
>>
>> from("direct:start")
>> .multicast()
>> .to("direct:route1", "direct:route2")
>> .end()
>> ;
>>
>> from("direct:route1")
>> .multicast()
>> .parallelProcessing()
>> .aggregationStrategy(new JoinReplyAggregationStrategy())
>> .to(TARGET1, TARGET2, TARGET3, TARGET4, TARGET5, TARGET6,
>> TARGET7, TARGET8)
>>
>> .end()
>> .process(new Processor() {
>> @Override
>> public void process(Exchange exchange) throws
>> Exception
>> {
>> String input =
>> exchange.getIn().getBody(String.class).replaceAll("}\\+\\{", ",");
>> exchange.getOut().setBody(input);
>> System.out.println("*****************"+input);
>> }
>> })
>> .to("direct:route2");
>>
>> from("direct:route2")
>> .process(new Processor() {
>> @Override
>> public void process(Exchange exchange) throws
>> Exception
>> {
>> System.out.println("################");
>>
>> System.out.println(exchange.getIn().getBody(String.class));
>> }
>> });
>>
>> Let's say 'direct:start' broadcasts message1 to 'direct:route1' and
>> 'direct:route2'.
>> 'direct:route2' does some processing and get a result message2 then sent
>> it
>> to 'direct:route2'.
>> So 'direct:route2' now receive two massages. I can now print both message1
>> and message2. But I want to concatenate these two messages in
>> 'direct:route2' as one. Is there any way?
>>
>> Thanks,
>> Meng
>>
>>
>>
>> --
>> View this message in context: http://camel.465427.n5.nabble.
>> com/Problem-Concatenate-the-different-input-from-different-
>> route-using-aggreagationStrategy-tp5789368p5789426.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>
>