By experimentation I managed to get the JMS component to do what I needed. I
couldn't get the SJMS one to work as its a bit to set in its ways once it
starts. The tricks were to:
1. add disableReplyTo=true to stop the InOut exchange replying to early
2. Create a custom splitter to return the exchange list in the headers
3. For some reason I had to use an onPrepare to lift the exchange up one
level - not sure why
4. Use the lazy reply logic as defined on the JMS component docs page.
Code below:
from("jms:topic:TOPIC.NAME?disableReplyTo=true")
.aggregate(constant(true))
.completionTimeout(2000L)
.groupExchanges()
.process(new GroupProcessor())
.split().method(MyRoutes.class)
.onPrepare(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Exchange realExchange =
exchange.getIn().getBody(Exchange.class);
exchange.getIn().getHeaders().clear();
exchange.getIn().getHeaders().putAll(realExchange.getIn().getHeaders());
exchange.getIn().setBody(realExchange.getIn().getBody());
}
})
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Destination replyDestination =
exchange.getIn().getHeader("JMSReplyTo", Destination.class);
exchange.getIn().setHeader("JMSDestination",
replyDestination);
exchange.getIn().removeHeader("JMSReplyTo");
JmsComponent component =
exchange.getContext().getComponent("jms", JmsComponent.class);
JmsEndpoint endpoint =
JmsEndpoint.newInstance(replyDestination, component);
ProducerTemplate producerTemplate =
exchange.getContext().createProducerTemplate();
producerTemplate.sendBodyAndHeaders(endpoint,
exchange.getIn().getBody(), exchange.getIn().getHeaders());
}
});
public class MyRoutes {
@Handler
public static List<Exchange> splitMe(@Header(value =
Exchange.GROUPED_EXCHANGE) List<Exchange> body) {
return body;
}
}
Is there a simpler way? And would these tricks work with the jetty
component. I really like the SJMS component but it doesn't have the
flexibility.
Also in the above if there is an error then the messages are lost so I need
to add a persistent store in to the route as well.
Nigel
--
View this message in context:
http://camel.465427.n5.nabble.com/How-to-do-aggregation-of-multiple-InOut-exchanges-from-different-producers-tp5741621p5741988.html
Sent from the Camel - Users mailing list archive at Nabble.com.