Hi See the parallelProcessing / executorService option on the aggregator http://camel.apache.org/aggregator2
On Wed, Sep 18, 2013 at 2:49 AM, Baris Acar <ba...@acar.org.uk> wrote: > Hi, > > I'm seeing some surprising behaviour with my camel route, and was hoping > someone in this group could help, as my trawl through the docs and Camel In > Action book have not found the answers I'm looking for. Apologies if this > question has been clearly answered elsewhere :-/ > > I have a route that looks a little like the following: > > from("seda:foo?concurrentConsumers=2") > .aggregate(header("myId"), myAggregationStrategy).completionSize(5) > .log("Sending out ${body} after a short pause...") > .delay(3000) // simulate a lengthy process > .log("Sending out ${body} imminently!") > .to(...) // other downstream processing > > Note that I'm using a SEDA with two *concurrent* consumers. I expected that > once a SEDA consumer thread has picked up a message that completes an > aggregation, that downstream processing will continue on that consumer > thread, whilst other such downstream processing for another 'completed > aggregation' message may be happening in parallel on the other SEDA > consumer thread. > > What I'm finding instead is that whilst all of the work downstream of > aggregate() does occur across the two consumer threads, it is serialised; > no two threads execute the processors at the same time. This becomes quite > noticeable if this downstream work is lengthy. I've uploaded a sample to > https://github.com/bacar/aggregator-lock, which you can run with mvn test > -Dtest=AggregateLock. It started from a sample from the CIA book. > > For example, you can see the whilst the second "Sending... after a short > pause" does occur on a separate thread (#2), it does not start until after > thread #1 has completed, despite the 3s delay(): > > 2013-09-18 00:45:15,693 [el-1) thread #1 - Threads] INFO route1 - Sending > out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] after a short pause... > 2013-09-18 00:45:18,695 [el-1) thread #1 - Threads] INFO route1 - Sending > out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] imminently! > 2013-09-18 00:45:18,696 [el-1) thread #2 - Threads] INFO route1 - Sending > out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] after a short pause... > 2013-09-18 00:45:21,698 [el-1) thread #2 - Threads] INFO route1 - Sending > out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] imminently! > > Is this behaviour expected? I found it _very_ surprising. Did I miss > something in the docs that describes this behaviour? If the behaviour is > expected, I am happy to try adding some info to the documentation if > someone can explain the intent behind it. > > I'm not terribly familiar with the code, but I've had a dig around, and it > looks like the reason for this behaviour is due to the following code > inside the process() method of > org.apache.camel.processor.aggregate.AggregateProcessor: > > lock.lock(); > try { > doAggregation(key, copy); > } finally { > lock.unlock(); > } > > The doAggregation() method performs both the aggregation (i.e., adding the > new exchange to the repository, checking if the completion criteria have > been met etc) _and_, if complete, submits the aggregated message to the > ExecutorService for downstream processing. However, since the default > executorService is the SynchronousExecutorService, all downstream > processing occurs synchronously with submission, and consequently, _within_ > the lock above. > > Whilst I can see obvious reasons that may make it necessary to perform the > actual aggregation inside a lock, I do find it quite surprising that the > downstream processing by default also occurs inside this lock. Are there > any other processors known to behave in this way, i.e., by taking a lock > around all downstream processing? > > I could potentially work around this issue by dispensing with the SEDA > concurrentConsumers and using aggregate().parallelProcessing() instead, > with a suitable executorService() specified, but this introduces a number > of complications, e.g.: > - if I repeatedly split() and re-aggregate() (by different criteria), then > _every time_ I aggregate I have to add > parallelProcessing()/executorService(); this is verbose and error prone. > - with repeated aggregates in a route, I need dedicated threads/pools per > aggregate(), which means way more threads than I really want/need. > - regardless, I don't get the predictable and simple behaviour I expected > of 'pick up job from SEDA, aggregate, synchronously process downstream > jobs' that I'd expected. > > Another possible workaround might be the optimistic locking, but I haven't > had the opportunity to study it yet. It seems unrelated - I think my > problem is with the very coarse granularity of the pessimistic lock, not > with whether it's optimistic. Plus, I don't really want my messages to ever > fail with a 'too many attempts to acquire the optimistic lock' exception, > and I might have quite high contention). > > Many thanks in advance for your help/comments! > > Baris. -- Claus Ibsen ----------------- Red Hat, Inc. Email: cib...@redhat.com Twitter: davsclaus Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen