And you just need to set executorService and refer to a thread pool to be shared / or a thread pool profile as template for settings. http://camel.apache.org/threading-model.html
That is one option to configure which is not verbose or error prone! On Wed, Sep 18, 2013 at 5:21 PM, Claus Ibsen <claus.ib...@gmail.com> wrote: > It works as designed. > > If you want aggregator to use concurrent threads for downstream then > you need to configure that. > > On Wed, Sep 18, 2013 at 5:15 PM, Baris Acar <ba...@acar.org.uk> wrote: >> Hi Claus, >> >> Thanks for your reply! I've tried using parallelProcessing and it comes with >> a few drawbacks as I've mentioned already. We're going with it as a >> workaround but I'm interested to know whether you consider the issue I've >> reported to be a bug. >> >> Do you believe that it is intentional/expected that by default the >> AggregateProcessor *holds a mutual exclusion lock* across all downstream >> processing, by default? It's really very unexpected to me, and the docs you >> link to make no mention of acquiring a lock over other code unrelated to the >> aggregation. As a user of a framework, one needs to know if a framework is >> going to acquire a mutual exclusion lock over my code, in order to reason >> about the parallelism. >> >> Importantly - are there any other processors which acquire a lock over all >> downstream processing? >> >> Barış >> >> >> On 18 Sep 2013, at 11:54, Claus Ibsen <claus.ib...@gmail.com> wrote: >> >>> Hi >>> >>> See the parallelProcessing / executorService option on the aggregator >>> http://camel.apache.org/aggregator2 >>> >>> On Wed, Sep 18, 2013 at 2:49 AM, Baris Acar <ba...@acar.org.uk> wrote: >>>> Hi, >>>> >>>> I'm seeing some surprising behaviour with my camel route, and was hoping >>>> someone in this group could help, as my trawl through the docs and Camel In >>>> Action book have not found the answers I'm looking for. Apologies if this >>>> question has been clearly answered elsewhere :-/ >>>> >>>> I have a route that looks a little like the following: >>>> >>>> from("seda:foo?concurrentConsumers=2") >>>> .aggregate(header("myId"), myAggregationStrategy).completionSize(5) >>>> .log("Sending out ${body} after a short pause...") >>>> .delay(3000) // simulate a lengthy process >>>> .log("Sending out ${body} imminently!") >>>> .to(...) // other downstream processing >>>> >>>> Note that I'm using a SEDA with two *concurrent* consumers. I expected that >>>> once a SEDA consumer thread has picked up a message that completes an >>>> aggregation, that downstream processing will continue on that consumer >>>> thread, whilst other such downstream processing for another 'completed >>>> aggregation' message may be happening in parallel on the other SEDA >>>> consumer thread. >>>> >>>> What I'm finding instead is that whilst all of the work downstream of >>>> aggregate() does occur across the two consumer threads, it is serialised; >>>> no two threads execute the processors at the same time. This becomes quite >>>> noticeable if this downstream work is lengthy. I've uploaded a sample to >>>> https://github.com/bacar/aggregator-lock, which you can run with mvn test >>>> -Dtest=AggregateLock. It started from a sample from the CIA book. >>>> >>>> For example, you can see the whilst the second "Sending... after a short >>>> pause" does occur on a separate thread (#2), it does not start until after >>>> thread #1 has completed, despite the 3s delay(): >>>> >>>> 2013-09-18 00:45:15,693 [el-1) thread #1 - Threads] INFO route1 - Sending >>>> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] after a short pause... >>>> 2013-09-18 00:45:18,695 [el-1) thread #1 - Threads] INFO route1 - Sending >>>> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] imminently! >>>> 2013-09-18 00:45:18,696 [el-1) thread #2 - Threads] INFO route1 - Sending >>>> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] after a short pause... >>>> 2013-09-18 00:45:21,698 [el-1) thread #2 - Threads] INFO route1 - Sending >>>> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] imminently! >>>> >>>> Is this behaviour expected? I found it _very_ surprising. Did I miss >>>> something in the docs that describes this behaviour? If the behaviour is >>>> expected, I am happy to try adding some info to the documentation if >>>> someone can explain the intent behind it. >>>> >>>> I'm not terribly familiar with the code, but I've had a dig around, and it >>>> looks like the reason for this behaviour is due to the following code >>>> inside the process() method of >>>> org.apache.camel.processor.aggregate.AggregateProcessor: >>>> >>>> lock.lock(); >>>> try { >>>> doAggregation(key, copy); >>>> } finally { >>>> lock.unlock(); >>>> } >>>> >>>> The doAggregation() method performs both the aggregation (i.e., adding the >>>> new exchange to the repository, checking if the completion criteria have >>>> been met etc) _and_, if complete, submits the aggregated message to the >>>> ExecutorService for downstream processing. However, since the default >>>> executorService is the SynchronousExecutorService, all downstream >>>> processing occurs synchronously with submission, and consequently, _within_ >>>> the lock above. >>>> >>>> Whilst I can see obvious reasons that may make it necessary to perform the >>>> actual aggregation inside a lock, I do find it quite surprising that the >>>> downstream processing by default also occurs inside this lock. Are there >>>> any other processors known to behave in this way, i.e., by taking a lock >>>> around all downstream processing? >>>> >>>> I could potentially work around this issue by dispensing with the SEDA >>>> concurrentConsumers and using aggregate().parallelProcessing() instead, >>>> with a suitable executorService() specified, but this introduces a number >>>> of complications, e.g.: >>>> - if I repeatedly split() and re-aggregate() (by different criteria), then >>>> _every time_ I aggregate I have to add >>>> parallelProcessing()/executorService(); this is verbose and error prone. >>>> - with repeated aggregates in a route, I need dedicated threads/pools per >>>> aggregate(), which means way more threads than I really want/need. >>>> - regardless, I don't get the predictable and simple behaviour I expected >>>> of 'pick up job from SEDA, aggregate, synchronously process downstream >>>> jobs' that I'd expected. >>>> >>>> Another possible workaround might be the optimistic locking, but I haven't >>>> had the opportunity to study it yet. It seems unrelated - I think my >>>> problem is with the very coarse granularity of the pessimistic lock, not >>>> with whether it's optimistic. Plus, I don't really want my messages to ever >>>> fail with a 'too many attempts to acquire the optimistic lock' exception, >>>> and I might have quite high contention). >>>> >>>> Many thanks in advance for your help/comments! >>>> >>>> Baris. >>> >>> >>> >>> -- >>> Claus Ibsen >>> ----------------- >>> Red Hat, Inc. >>> Email: cib...@redhat.com >>> Twitter: davsclaus >>> Blog: http://davsclaus.com >>> Author of Camel in Action: http://www.manning.com/ibsen > > > > -- > Claus Ibsen > ----------------- > Red Hat, Inc. > Email: cib...@redhat.com > Twitter: davsclaus > Blog: http://davsclaus.com > Author of Camel in Action: http://www.manning.com/ibsen -- Claus Ibsen ----------------- Red Hat, Inc. Email: cib...@redhat.com Twitter: davsclaus Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen