Hello, In my application I have a route that consumes a JMS queue, then aggregate on a header and finally process. The issue is that the consumption of the queue is stopped until the processing is done. The queue contains 25 millions messages (600K unique messages based on aggregation rule), aggregation is configured to complete on timeout (30 min) with the UseLatestAggregationStrategy. Persistence is handled with LevelDB. Paralllel processing is enabled.
What I see is the consumption of 2-3 millions messages during the 30 min, and then the consumption stops after the aggregator thread starts the processing. The consumption resumes after all exchanges have been processed. In my case the processing takes days, so no consumption of the queue. I try to reproduce with a simpler unit test. By analysing the log I find the same kind of issue: 13:15:00.852 [main] INFO route1 - Received message 0 - 0 13:15:03.861 [main] INFO route1 - Received message 0 - 174288 13:15:03.876 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process message 0 - 4 13:15:04.880 [main] INFO route1 - Received message 0 - 266292 13:15:04.987 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process message 0 - 10 13:15:06.159 [Camel (camel-1) thread #8 - Aggregator] INFO route1 - Process message 0 - 16858 13:15:06.159 [main] INFO route1 - Received message 0 - 266293 <----- 1s wait with last message 266292 13:15:07.178 [main] INFO route1 - Received message 0 - 368107 13:15:08.065 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process message 0 - 17882 13:15:23.241 [Camel (camel-1) thread #12 - Aggregator] INFO route1 - Process message 0 - 199001 13:15:23.241 [main] INFO route1 - Received message 0 - 368108 <----- 16s wait with last message 368107 Is this something that can be mitigated? Am I missing something in the route? public class AggregationTest extends CamelTestSupport { @Test public void testMock() throws Exception { for (int duplicate = 0; duplicate < 2; duplicate++) { for (int i = 0; i < 400000; i++) { template.sendBodyAndHeader("direct:start", String.format("%d - %d", duplicate, i), "JMSXGroupID", String.valueOf(i)); } } NotifyBuilder notify = new NotifyBuilder(context) .from("mock:result") .create(); boolean done = notify.matches(30, TimeUnit.SECONDS); } @Override protected RoutesBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() { from("direct:start") .log("Received message ${body}") .aggregate(header("JMSXGroupID"), new UseLatestAggregationStrategy()) .completionTimeout(3000) .parallelProcessing() .log("Process message ${body}") .to("mock:result"); } }; } } Thanks Sydney