Good morning Claus, :o) I had no luck so far. I also tested this route with the latest ActiveMQ 5.5-SNAPSHOT and 5.6-SNAPSHOT versions and it still failed.
Best, Christian On Wed, Mar 7, 2012 at 9:06 AM, Claus Ibsen <[email protected]> wrote: > Ah a typo, its of course you -> I. > > Christian did you have luck finding the ticket(s)? > > I remember there was an attachment with a sample unit test that > demonstrated the issue. > And it was reported a fair long time ago, like 1-2 years. > I think it was working in AMQ 5.3 but failed when upgrading to 5.4 or > it was 5.4 -> 5.5. > > > > On Tue, Mar 6, 2012 at 3:05 PM, Claus Ibsen <[email protected]> wrote: > > On Tue, Mar 6, 2012 at 2:43 PM, Claus Ibsen <[email protected]> > wrote: > >> Its not Camel. There is a JIRA ticket in AMQ about this. > >> > > > > Crap you gotta be some sort of imbecile to use the ASF JIRA. Have > > been trying for 20 min to find the good damn JIRA ticket about this. > > It was first reported at Camel and then moved to AMQ as an issue. > > > > > > > >> > >> On Tue, Mar 6, 2012 at 2:26 PM, Christian Müller > >> <[email protected]> wrote: > >>> We have this issue by running Apache Camel 2.6.0. I verified that this > issue > >>> still exists with the brand new 2.9.1 version. > >>> We use Java 1.6.0_29 and are able to reproduce it on MacOS and Windows > 2000. > >>> > >>> Please have a look at the route definition below to follow the > explanation. > >>> In route-2 we send the received messages into the queue "aggregate" and > >>> afterwards to an aggregator. The aggregator aggregate messages into > batches > >>> of fife. If fife messages are aggregated, the following processor read > all > >>> messages from queue "aggregate", aggregate the content and send it to > a mock > >>> endpoint. This works fine as long we don't use transacted routes. But > >>> because we don't want to lose messages (in reality we use a pesistent > >>> aggregation repository), we switched to transacted routes. With this > change, > >>> our processor (which use the Camel consumerTemplate) only receives one > >>> message in the while loop. We checked the queue and there are four > other > >>> messages which are not read. We tried multiple different combinations > of > >>> connectionFactory settings, cacheLevelName settings, ... but without > luck. > >>> > >>> Any idea what we did wrong or what Camel does wrong? > >>> > >>> The route: > >>> @Override > >>> public void configure() throws Exception { > >>> from("direct:start").routeId("route-1") > >>> .inOnly("activemq:queue:start"); > >>> > >>> from("activemqTx:queue:start").routeId("route-2") > >>> .transacted("required") > >>> .inOnly("activemqTx:queue:aggregate") > >>> .aggregate(header("aggregationGroup"), new > >>> BodyInAggregatingStrategy()).completionSize(5) > >>> .process(new Processor() { > >>> public void process(Exchange ex) throws Exception { > >>> > >>> log.info("################################################"); > >>> log.info("Consume from 'activemq:queue:aggregate' > after > >>> aggregation completed"); > >>> > >>> log.info("################################################"); > >>> > >>> StringBuilder builder = new StringBuilder(); > >>> Exchange exchange = null; > >>> > >>> while ((exchange = > >>> consumerTemplate.receiveNoWait("activemq:queue:aggregate")) != null) { > >>> > builder.append(exchange.getIn().getBody(String.class)); > >>> consumerTemplate.doneUoW(exchange); > >>> } > >>> > >>> > >>> log.info("################################################"); > >>> log.info("no more messages to consume from queue > >>> 'activemq:queue:aggregate'"); > >>> > >>> log.info("################################################"); > >>> > >>> ex.getIn().setBody(builder.toString()); > >>> } > >>> }) > >>> .inOnly("mock:end"); > >>> } > >>> > >>> The test: > >>> public void test() throws InterruptedException { > >>> end.expectedBodiesReceived("12345"); > >>> > >>> template.sendBodyAndHeader("direct:start", "1", "aggregationGroup", > >>> "A"); > >>> template.sendBodyAndHeader("direct:start", "2", "aggregationGroup", > >>> "A"); > >>> template.sendBodyAndHeader("direct:start", "3", "aggregationGroup", > >>> "A"); > >>> template.sendBodyAndHeader("direct:start", "4", "aggregationGroup", > >>> "A"); > >>> template.sendBodyAndHeader("direct:start", "5", "aggregationGroup", > >>> "A"); > >>> > >>> assertMockEndpointsSatisfied(5, TimeUnit.SECONDS); > >>> } > >>> > >>> Please find attached my Eclipse project which contains the unit test. > >>> > >>> Best, > >>> Christian > >> > >> > >> > >> -- > >> Claus Ibsen > >> ----------------- > >> FuseSource > >> Email: [email protected] > >> Web: http://fusesource.com > >> Twitter: davsclaus, fusenews > >> Blog: http://davsclaus.blogspot.com/ > >> Author of Camel in Action: http://www.manning.com/ibsen/ > > > > > > > > -- > > Claus Ibsen > > ----------------- > > FuseSource > > Email: [email protected] > > Web: http://fusesource.com > > Twitter: davsclaus, fusenews > > Blog: http://davsclaus.blogspot.com/ > > Author of Camel in Action: http://www.manning.com/ibsen/ > > > > -- > Claus Ibsen > ----------------- > FuseSource > Email: [email protected] > Web: http://fusesource.com > Twitter: davsclaus, fusenews > Blog: http://davsclaus.blogspot.com/ > Author of Camel in Action: http://www.manning.com/ibsen/ >
