Ok, thanks for the info Claus. Let's see how good my search skills are...
;-)

Best,
Christian

On Tue, Mar 6, 2012 at 3:05 PM, Claus Ibsen <[email protected]> wrote:

> On Tue, Mar 6, 2012 at 2:43 PM, Claus Ibsen <[email protected]> wrote:
> > Its not Camel. There is a JIRA ticket in AMQ about this.
> >
>
> Crap you gotta be some sort of imbecile to use  the ASF JIRA. Have
> been trying for 20 min to find the good damn JIRA ticket about this.
> It was first reported at Camel and then moved to AMQ as an issue.
>
>
>
> >
> > On Tue, Mar 6, 2012 at 2:26 PM, Christian Müller
> > <[email protected]> wrote:
> >> We have this issue by running Apache Camel 2.6.0. I verified that this
> issue
> >> still exists with the brand new 2.9.1 version.
> >> We use Java 1.6.0_29 and are able to reproduce it on MacOS and Windows
> 2000.
> >>
> >> Please have a look at the route definition below to follow the
> explanation.
> >> In route-2 we send the received messages into the queue "aggregate" and
> >> afterwards to an aggregator. The aggregator aggregate messages into
> batches
> >> of fife. If fife messages are aggregated, the following processor read
> all
> >> messages from queue "aggregate", aggregate the content and send it to a
> mock
> >> endpoint. This works fine as long we don't use transacted routes. But
> >> because we don't want to lose messages (in reality we use a pesistent
> >> aggregation repository), we switched to transacted routes. With this
> change,
> >> our processor (which use the Camel consumerTemplate) only receives one
> >> message in the while loop. We checked the queue and there are four other
> >> messages which are not read. We tried multiple different combinations of
> >> connectionFactory settings, cacheLevelName settings, ... but without
> luck.
> >>
> >> Any idea what we did wrong or what Camel does wrong?
> >>
> >> The route:
> >> @Override
> >> public void configure() throws Exception {
> >>     from("direct:start").routeId("route-1")
> >>         .inOnly("activemq:queue:start");
> >>
> >>     from("activemqTx:queue:start").routeId("route-2")
> >>         .transacted("required")
> >>         .inOnly("activemqTx:queue:aggregate")
> >>         .aggregate(header("aggregationGroup"), new
> >> BodyInAggregatingStrategy()).completionSize(5)
> >>         .process(new Processor() {
> >>             public void process(Exchange ex) throws Exception {
> >>
> >> log.info("################################################");
> >>                 log.info("Consume from 'activemq:queue:aggregate' after
> >> aggregation completed");
> >>
> >> log.info("################################################");
> >>
> >>                 StringBuilder builder = new StringBuilder();
> >>                 Exchange exchange = null;
> >>
> >>                 while ((exchange =
> >> consumerTemplate.receiveNoWait("activemq:queue:aggregate")) != null) {
> >>
> builder.append(exchange.getIn().getBody(String.class));
> >>                     consumerTemplate.doneUoW(exchange);
> >>                 }
> >>
> >>
> >> log.info("################################################");
> >>                 log.info("no more messages to consume from queue
> >> 'activemq:queue:aggregate'");
> >>
> >> log.info("################################################");
> >>
> >>                 ex.getIn().setBody(builder.toString());
> >>             }
> >>         })
> >>         .inOnly("mock:end");
> >> }
> >>
> >> The test:
> >> public void test() throws InterruptedException {
> >>     end.expectedBodiesReceived("12345");
> >>
> >>     template.sendBodyAndHeader("direct:start", "1", "aggregationGroup",
> >> "A");
> >>     template.sendBodyAndHeader("direct:start", "2", "aggregationGroup",
> >> "A");
> >>     template.sendBodyAndHeader("direct:start", "3", "aggregationGroup",
> >> "A");
> >>     template.sendBodyAndHeader("direct:start", "4", "aggregationGroup",
> >> "A");
> >>     template.sendBodyAndHeader("direct:start", "5", "aggregationGroup",
> >> "A");
> >>
> >>     assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
> >> }
> >>
> >> Please find attached my Eclipse project which contains the unit test.
> >>
> >> Best,
> >> Christian
> >
> >
> >
> > --
> > Claus Ibsen
> > -----------------
> > FuseSource
> > Email: [email protected]
> > Web: http://fusesource.com
> > Twitter: davsclaus, fusenews
> > Blog: http://davsclaus.blogspot.com/
> > Author of Camel in Action: http://www.manning.com/ibsen/
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: [email protected]
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>

Reply via email to