Its not Camel. There is a JIRA ticket in AMQ about this.

On Tue, Mar 6, 2012 at 2:26 PM, Christian Müller
<[email protected]> wrote:
> We have this issue by running Apache Camel 2.6.0. I verified that this issue
> still exists with the brand new 2.9.1 version.
> We use Java 1.6.0_29 and are able to reproduce it on MacOS and Windows 2000.
>
> Please have a look at the route definition below to follow the explanation.
> In route-2 we send the received messages into the queue "aggregate" and
> afterwards to an aggregator. The aggregator aggregate messages into batches
> of fife. If fife messages are aggregated, the following processor read all
> messages from queue "aggregate", aggregate the content and send it to a mock
> endpoint. This works fine as long we don't use transacted routes. But
> because we don't want to lose messages (in reality we use a pesistent
> aggregation repository), we switched to transacted routes. With this change,
> our processor (which use the Camel consumerTemplate) only receives one
> message in the while loop. We checked the queue and there are four other
> messages which are not read. We tried multiple different combinations of
> connectionFactory settings, cacheLevelName settings, ... but without luck.
>
> Any idea what we did wrong or what Camel does wrong?
>
> The route:
> @Override
> public void configure() throws Exception {
>     from("direct:start").routeId("route-1")
>         .inOnly("activemq:queue:start");
>
>     from("activemqTx:queue:start").routeId("route-2")
>         .transacted("required")
>         .inOnly("activemqTx:queue:aggregate")
>         .aggregate(header("aggregationGroup"), new
> BodyInAggregatingStrategy()).completionSize(5)
>         .process(new Processor() {
>             public void process(Exchange ex) throws Exception {
>
> log.info("################################################");
>                 log.info("Consume from 'activemq:queue:aggregate' after
> aggregation completed");
>
> log.info("################################################");
>
>                 StringBuilder builder = new StringBuilder();
>                 Exchange exchange = null;
>
>                 while ((exchange =
> consumerTemplate.receiveNoWait("activemq:queue:aggregate")) != null) {
>                     builder.append(exchange.getIn().getBody(String.class));
>                     consumerTemplate.doneUoW(exchange);
>                 }
>
>
> log.info("################################################");
>                 log.info("no more messages to consume from queue
> 'activemq:queue:aggregate'");
>
> log.info("################################################");
>
>                 ex.getIn().setBody(builder.toString());
>             }
>         })
>         .inOnly("mock:end");
> }
>
> The test:
> public void test() throws InterruptedException {
>     end.expectedBodiesReceived("12345");
>
>     template.sendBodyAndHeader("direct:start", "1", "aggregationGroup",
> "A");
>     template.sendBodyAndHeader("direct:start", "2", "aggregationGroup",
> "A");
>     template.sendBodyAndHeader("direct:start", "3", "aggregationGroup",
> "A");
>     template.sendBodyAndHeader("direct:start", "4", "aggregationGroup",
> "A");
>     template.sendBodyAndHeader("direct:start", "5", "aggregationGroup",
> "A");
>
>     assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
> }
>
> Please find attached my Eclipse project which contains the unit test.
>
> Best,
> Christian



-- 
Claus Ibsen
-----------------
FuseSource
Email: [email protected]
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Reply via email to