It works as designed.

If you want aggregator to use concurrent threads for downstream then
you need to configure that.

On Wed, Sep 18, 2013 at 5:15 PM, Baris Acar <ba...@acar.org.uk> wrote:
> Hi Claus,
>
> Thanks for your reply! I've tried using parallelProcessing and it comes with 
> a few drawbacks as I've mentioned already. We're going with it as a 
> workaround but I'm interested to know whether you consider the issue I've 
> reported to be a bug.
>
> Do you believe that it is intentional/expected that by default the 
> AggregateProcessor *holds a mutual exclusion lock* across all downstream 
> processing, by default? It's really very unexpected to me, and the docs you 
> link to make no mention of acquiring a lock over other code unrelated to the 
> aggregation. As a user of a framework, one needs to know if a framework is 
> going to acquire a mutual exclusion lock over my code, in order to reason 
> about the parallelism.
>
> Importantly - are there any other processors which acquire a lock over all 
> downstream processing?
>
> Barış
>
>
> On 18 Sep 2013, at 11:54, Claus Ibsen <claus.ib...@gmail.com> wrote:
>
>> Hi
>>
>> See the parallelProcessing / executorService option on the aggregator
>> http://camel.apache.org/aggregator2
>>
>> On Wed, Sep 18, 2013 at 2:49 AM, Baris Acar <ba...@acar.org.uk> wrote:
>>> Hi,
>>>
>>> I'm seeing some surprising behaviour with my camel route, and was hoping
>>> someone in this group could help, as my trawl through the docs and Camel In
>>> Action book have not found the answers I'm looking for. Apologies if this
>>> question has been clearly answered elsewhere :-/
>>>
>>> I have a route that looks a little like the following:
>>>
>>>    from("seda:foo?concurrentConsumers=2")
>>>      .aggregate(header("myId"), myAggregationStrategy).completionSize(5)
>>>        .log("Sending out ${body} after a short pause...")
>>>        .delay(3000) // simulate a lengthy process
>>>        .log("Sending out ${body} imminently!")
>>>        .to(...) // other downstream processing
>>>
>>> Note that I'm using a SEDA with two *concurrent* consumers. I expected that
>>> once a SEDA consumer thread has picked up a message that completes an
>>> aggregation, that downstream processing will continue on that consumer
>>> thread, whilst other such downstream processing for another 'completed
>>> aggregation' message may be happening in parallel on the other SEDA
>>> consumer thread.
>>>
>>> What I'm finding instead is that whilst all of the work downstream of
>>> aggregate() does occur across the two consumer threads, it is serialised;
>>> no two threads execute the processors at the same time. This becomes quite
>>> noticeable if this downstream work is lengthy. I've uploaded a sample to
>>> https://github.com/bacar/aggregator-lock, which you can run with mvn test
>>> -Dtest=AggregateLock. It started from a sample from the CIA book.
>>>
>>> For example, you can see the whilst the second "Sending... after a short
>>> pause" does occur on a separate thread (#2), it does not start until after
>>> thread #1 has completed, despite the 3s delay():
>>>
>>> 2013-09-18 00:45:15,693 [el-1) thread #1 - Threads] INFO  route1 - Sending
>>> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] after a short pause...
>>> 2013-09-18 00:45:18,695 [el-1) thread #1 - Threads] INFO  route1 - Sending
>>> out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] imminently!
>>> 2013-09-18 00:45:18,696 [el-1) thread #2 - Threads] INFO  route1 - Sending
>>> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] after a short pause...
>>> 2013-09-18 00:45:21,698 [el-1) thread #2 - Threads] INFO  route1 - Sending
>>> out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] imminently!
>>>
>>> Is this behaviour expected? I found it _very_ surprising. Did I miss
>>> something in the docs that describes this behaviour? If the behaviour is
>>> expected, I am happy to try adding some info to the documentation if
>>> someone can explain the intent behind it.
>>>
>>> I'm not terribly familiar with the code, but I've had a dig around, and it
>>> looks like the reason for this behaviour is due to the following code
>>> inside the process() method of
>>> org.apache.camel.processor.aggregate.AggregateProcessor:
>>>
>>>            lock.lock();
>>>            try {
>>>                doAggregation(key, copy);
>>>            } finally {
>>>                lock.unlock();
>>>            }
>>>
>>> The doAggregation() method performs both the aggregation (i.e., adding the
>>> new exchange to the repository, checking if the completion criteria have
>>> been met etc) _and_, if complete, submits the aggregated message to the
>>> ExecutorService for downstream processing. However, since the default
>>> executorService is the SynchronousExecutorService, all downstream
>>> processing occurs synchronously with submission, and consequently, _within_
>>> the lock above.
>>>
>>> Whilst I can see obvious reasons that may make it necessary to perform the
>>> actual aggregation inside a lock, I do find it quite surprising that the
>>> downstream processing by default also occurs inside this lock. Are there
>>> any other processors known to behave in this way, i.e., by taking a lock
>>> around all downstream processing?
>>>
>>> I could potentially work around this issue by dispensing with the SEDA
>>> concurrentConsumers and using aggregate().parallelProcessing() instead,
>>> with a suitable executorService() specified, but this introduces a number
>>> of complications, e.g.:
>>> - if I repeatedly split() and re-aggregate() (by different criteria), then
>>> _every time_ I aggregate I have to add
>>> parallelProcessing()/executorService(); this is verbose and error prone.
>>> - with repeated aggregates in a route, I need dedicated threads/pools per
>>> aggregate(), which means way more threads than I really want/need.
>>> - regardless, I don't get the predictable and simple behaviour I expected
>>> of 'pick up job from SEDA, aggregate, synchronously process downstream
>>> jobs' that I'd expected.
>>>
>>> Another possible workaround might be the optimistic locking, but I haven't
>>> had the opportunity to study it yet. It seems unrelated - I think my
>>> problem is with the very coarse granularity of the pessimistic lock, not
>>> with whether it's optimistic. Plus, I don't really want my messages to ever
>>> fail with a 'too many attempts to acquire the optimistic lock' exception,
>>> and I might have quite high contention).
>>>
>>> Many thanks in advance for your help/comments!
>>>
>>> Baris.
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> Red Hat, Inc.
>> Email: cib...@redhat.com
>> Twitter: davsclaus
>> Blog: http://davsclaus.com
>> Author of Camel in Action: http://www.manning.com/ibsen



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cib...@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Reply via email to