Hello,

In my application I have a route that consumes a JMS queue, then aggregate on a 
header and finally process. The issue is that the consumption of the queue is 
stopped until the processing is done. The queue contains 25 millions messages 
(600K unique messages based on aggregation rule), aggregation is configured to 
complete on timeout (30 min) with the UseLatestAggregationStrategy. Persistence 
is handled with LevelDB. Paralllel processing is enabled.

What I see is the consumption of 2-3 millions messages during the 30 min, and 
then the consumption stops after the aggregator thread starts the processing. 
The consumption resumes after all exchanges have been processed. In my case the 
processing takes days, so no consumption of the queue.

I try to reproduce with a simpler unit test. By analysing the log I find the 
same kind of issue:

13:15:00.852 [main] INFO route1 - Received message 0 - 0

13:15:03.861 [main] INFO route1 - Received message 0 - 174288
13:15:03.876 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process 
message 0 - 4

13:15:04.880 [main] INFO route1 - Received message 0 - 266292
13:15:04.987 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process 
message 0 - 10

13:15:06.159 [Camel (camel-1) thread #8 - Aggregator] INFO route1 - Process 
message 0 - 16858
13:15:06.159 [main] INFO route1 - Received message 0 - 266293  <----- 1s wait 
with last message 266292

13:15:07.178 [main] INFO route1 - Received message 0 - 368107

13:15:08.065 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process 
message 0 - 17882

13:15:23.241 [Camel (camel-1) thread #12 - Aggregator] INFO route1 - Process 
message 0 - 199001
13:15:23.241 [main] INFO route1 - Received message 0 - 368108  <----- 16s wait 
with last message 368107

Is this something that can be mitigated? Am I missing something in the route?

public class AggregationTest extends CamelTestSupport {

    @Test
    public void testMock() throws Exception {
        for (int duplicate = 0; duplicate < 2; duplicate++) {
            for (int i = 0; i < 400000; i++) {
                template.sendBodyAndHeader("direct:start",
                        String.format("%d - %d", duplicate, i),
                        "JMSXGroupID", String.valueOf(i));
            }
        }
        NotifyBuilder notify = new NotifyBuilder(context)
                .from("mock:result")
                .create();

        boolean done = notify.matches(30, TimeUnit.SECONDS);
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() {
                from("direct:start")
                        .log("Received message ${body}")
                        .aggregate(header("JMSXGroupID"), new 
UseLatestAggregationStrategy())
                        .completionTimeout(3000)
                        .parallelProcessing()
                        .log("Process message ${body}")
                        .to("mock:result");
            }
        };
    }
}

Thanks
Sydney

Reply via email to