Its not Camel. There is a JIRA ticket in AMQ about this.
On Tue, Mar 6, 2012 at 2:26 PM, Christian Müller <[email protected]> wrote: > We have this issue by running Apache Camel 2.6.0. I verified that this issue > still exists with the brand new 2.9.1 version. > We use Java 1.6.0_29 and are able to reproduce it on MacOS and Windows 2000. > > Please have a look at the route definition below to follow the explanation. > In route-2 we send the received messages into the queue "aggregate" and > afterwards to an aggregator. The aggregator aggregate messages into batches > of fife. If fife messages are aggregated, the following processor read all > messages from queue "aggregate", aggregate the content and send it to a mock > endpoint. This works fine as long we don't use transacted routes. But > because we don't want to lose messages (in reality we use a pesistent > aggregation repository), we switched to transacted routes. With this change, > our processor (which use the Camel consumerTemplate) only receives one > message in the while loop. We checked the queue and there are four other > messages which are not read. We tried multiple different combinations of > connectionFactory settings, cacheLevelName settings, ... but without luck. > > Any idea what we did wrong or what Camel does wrong? > > The route: > @Override > public void configure() throws Exception { > from("direct:start").routeId("route-1") > .inOnly("activemq:queue:start"); > > from("activemqTx:queue:start").routeId("route-2") > .transacted("required") > .inOnly("activemqTx:queue:aggregate") > .aggregate(header("aggregationGroup"), new > BodyInAggregatingStrategy()).completionSize(5) > .process(new Processor() { > public void process(Exchange ex) throws Exception { > > log.info("################################################"); > log.info("Consume from 'activemq:queue:aggregate' after > aggregation completed"); > > log.info("################################################"); > > StringBuilder builder = new StringBuilder(); > Exchange exchange = null; > > while ((exchange = > consumerTemplate.receiveNoWait("activemq:queue:aggregate")) != null) { > builder.append(exchange.getIn().getBody(String.class)); > consumerTemplate.doneUoW(exchange); > } > > > log.info("################################################"); > log.info("no more messages to consume from queue > 'activemq:queue:aggregate'"); > > log.info("################################################"); > > ex.getIn().setBody(builder.toString()); > } > }) > .inOnly("mock:end"); > } > > The test: > public void test() throws InterruptedException { > end.expectedBodiesReceived("12345"); > > template.sendBodyAndHeader("direct:start", "1", "aggregationGroup", > "A"); > template.sendBodyAndHeader("direct:start", "2", "aggregationGroup", > "A"); > template.sendBodyAndHeader("direct:start", "3", "aggregationGroup", > "A"); > template.sendBodyAndHeader("direct:start", "4", "aggregationGroup", > "A"); > template.sendBodyAndHeader("direct:start", "5", "aggregationGroup", > "A"); > > assertMockEndpointsSatisfied(5, TimeUnit.SECONDS); > } > > Please find attached my Eclipse project which contains the unit test. > > Best, > Christian -- Claus Ibsen ----------------- FuseSource Email: [email protected] Web: http://fusesource.com Twitter: davsclaus, fusenews Blog: http://davsclaus.blogspot.com/ Author of Camel in Action: http://www.manning.com/ibsen/
