Hi, I'm seeing some surprising behaviour with my camel route, and was hoping someone in this group could help, as my trawl through the docs and Camel In Action book have not found the answers I'm looking for. Apologies if this question has been clearly answered elsewhere :-/
I have a route that looks a little like the following: from("seda:foo?concurrentConsumers=2") .aggregate(header("myId"), myAggregationStrategy).completionSize(5) .log("Sending out ${body} after a short pause...") .delay(3000) // simulate a lengthy process .log("Sending out ${body} imminently!") .to(...) // other downstream processing Note that I'm using a SEDA with two *concurrent* consumers. I expected that once a SEDA consumer thread has picked up a message that completes an aggregation, that downstream processing will continue on that consumer thread, whilst other such downstream processing for another 'completed aggregation' message may be happening in parallel on the other SEDA consumer thread. What I'm finding instead is that whilst all of the work downstream of aggregate() does occur across the two consumer threads, it is serialised; no two threads execute the processors at the same time. This becomes quite noticeable if this downstream work is lengthy. I've uploaded a sample to https://github.com/bacar/aggregator-lock, which you can run with mvn test -Dtest=AggregateLock. It started from a sample from the CIA book. For example, you can see the whilst the second "Sending... after a short pause" does occur on a separate thread (#2), it does not start until after thread #1 has completed, despite the 3s delay(): 2013-09-18 00:45:15,693 [el-1) thread #1 - Threads] INFO route1 - Sending out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] after a short pause... 2013-09-18 00:45:18,695 [el-1) thread #1 - Threads] INFO route1 - Sending out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] imminently! 2013-09-18 00:45:18,696 [el-1) thread #2 - Threads] INFO route1 - Sending out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] after a short pause... 2013-09-18 00:45:21,698 [el-1) thread #2 - Threads] INFO route1 - Sending out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] imminently! Is this behaviour expected? I found it _very_ surprising. Did I miss something in the docs that describes this behaviour? If the behaviour is expected, I am happy to try adding some info to the documentation if someone can explain the intent behind it. I'm not terribly familiar with the code, but I've had a dig around, and it looks like the reason for this behaviour is due to the following code inside the process() method of org.apache.camel.processor.aggregate.AggregateProcessor: lock.lock(); try { doAggregation(key, copy); } finally { lock.unlock(); } The doAggregation() method performs both the aggregation (i.e., adding the new exchange to the repository, checking if the completion criteria have been met etc) _and_, if complete, submits the aggregated message to the ExecutorService for downstream processing. However, since the default executorService is the SynchronousExecutorService, all downstream processing occurs synchronously with submission, and consequently, _within_ the lock above. Whilst I can see obvious reasons that may make it necessary to perform the actual aggregation inside a lock, I do find it quite surprising that the downstream processing by default also occurs inside this lock. Are there any other processors known to behave in this way, i.e., by taking a lock around all downstream processing? I could potentially work around this issue by dispensing with the SEDA concurrentConsumers and using aggregate().parallelProcessing() instead, with a suitable executorService() specified, but this introduces a number of complications, e.g.: - if I repeatedly split() and re-aggregate() (by different criteria), then _every time_ I aggregate I have to add parallelProcessing()/executorService(); this is verbose and error prone. - with repeated aggregates in a route, I need dedicated threads/pools per aggregate(), which means way more threads than I really want/need. - regardless, I don't get the predictable and simple behaviour I expected of 'pick up job from SEDA, aggregate, synchronously process downstream jobs' that I'd expected. Another possible workaround might be the optimistic locking, but I haven't had the opportunity to study it yet. It seems unrelated - I think my problem is with the very coarse granularity of the pessimistic lock, not with whether it's optimistic. Plus, I don't really want my messages to ever fail with a 'too many attempts to acquire the optimistic lock' exception, and I might have quite high contention). Many thanks in advance for your help/comments! Baris.