Hi,

I'm seeing some surprising behaviour with my camel route, and was hoping
someone in this group could help, as my trawl through the docs and Camel In
Action book have not found the answers I'm looking for. Apologies if this
question has been clearly answered elsewhere :-/

I have a route that looks a little like the following:

    from("seda:foo?concurrentConsumers=2")
      .aggregate(header("myId"), myAggregationStrategy).completionSize(5)
        .log("Sending out ${body} after a short pause...")
        .delay(3000) // simulate a lengthy process
        .log("Sending out ${body} imminently!")
        .to(...) // other downstream processing

Note that I'm using a SEDA with two *concurrent* consumers. I expected that
once a SEDA consumer thread has picked up a message that completes an
aggregation, that downstream processing will continue on that consumer
thread, whilst other such downstream processing for another 'completed
aggregation' message may be happening in parallel on the other SEDA
consumer thread.

What I'm finding instead is that whilst all of the work downstream of
aggregate() does occur across the two consumer threads, it is serialised;
no two threads execute the processors at the same time. This becomes quite
noticeable if this downstream work is lengthy. I've uploaded a sample to
https://github.com/bacar/aggregator-lock, which you can run with mvn test
-Dtest=AggregateLock. It started from a sample from the CIA book.

For example, you can see the whilst the second "Sending... after a short
pause" does occur on a separate thread (#2), it does not start until after
thread #1 has completed, despite the 3s delay():

2013-09-18 00:45:15,693 [el-1) thread #1 - Threads] INFO  route1 - Sending
out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] after a short pause...
2013-09-18 00:45:18,695 [el-1) thread #1 - Threads] INFO  route1 - Sending
out aggregated [1:0, 1:1, 1:2, 1:3, 1:4] imminently!
2013-09-18 00:45:18,696 [el-1) thread #2 - Threads] INFO  route1 - Sending
out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] after a short pause...
2013-09-18 00:45:21,698 [el-1) thread #2 - Threads] INFO  route1 - Sending
out aggregated [0:0, 0:1, 0:2, 0:3, 0:4] imminently!

Is this behaviour expected? I found it _very_ surprising. Did I miss
something in the docs that describes this behaviour? If the behaviour is
expected, I am happy to try adding some info to the documentation if
someone can explain the intent behind it.

I'm not terribly familiar with the code, but I've had a dig around, and it
looks like the reason for this behaviour is due to the following code
inside the process() method of
org.apache.camel.processor.aggregate.AggregateProcessor:

            lock.lock();
            try {
                doAggregation(key, copy);
            } finally {
                lock.unlock();
            }

The doAggregation() method performs both the aggregation (i.e., adding the
new exchange to the repository, checking if the completion criteria have
been met etc) _and_, if complete, submits the aggregated message to the
ExecutorService for downstream processing. However, since the default
executorService is the SynchronousExecutorService, all downstream
processing occurs synchronously with submission, and consequently, _within_
the lock above.

Whilst I can see obvious reasons that may make it necessary to perform the
actual aggregation inside a lock, I do find it quite surprising that the
downstream processing by default also occurs inside this lock. Are there
any other processors known to behave in this way, i.e., by taking a lock
around all downstream processing?

I could potentially work around this issue by dispensing with the SEDA
concurrentConsumers and using aggregate().parallelProcessing() instead,
with a suitable executorService() specified, but this introduces a number
of complications, e.g.:
- if I repeatedly split() and re-aggregate() (by different criteria), then
_every time_ I aggregate I have to add
parallelProcessing()/executorService(); this is verbose and error prone.
- with repeated aggregates in a route, I need dedicated threads/pools per
aggregate(), which means way more threads than I really want/need.
- regardless, I don't get the predictable and simple behaviour I expected
of 'pick up job from SEDA, aggregate, synchronously process downstream
jobs' that I'd expected.

Another possible workaround might be the optimistic locking, but I haven't
had the opportunity to study it yet. It seems unrelated - I think my
problem is with the very coarse granularity of the pessimistic lock, not
with whether it's optimistic. Plus, I don't really want my messages to ever
fail with a 'too many attempts to acquire the optimistic lock' exception,
and I might have quite high contention).

Many thanks in advance for your help/comments!

Baris.

Reply via email to