You should have the <choice> inside the <aggregate> so all the logic
is executed as part of when the aggregator completes.
And btw the splitter has a property it sets on the last message when
its complete. You can transfer that as a header over JMS
<from uri="file:src/data?noop=true" />
<setHeader headerName="transactionGUID">
<simple>${file:name.noext}-${date:now:yyyyMMdd-HHmmssSSS}</simple>
</setHeader>
<split>
<tokenize token="organisation" xml="true" />
<setHeader headerName="lastMessage">
<property>Exchange.SPLIT_COMPLETE</property>
</setHeader>
<to uri="activemq:organisation.queue" />
</split>
</route>
On Tue, Oct 22, 2013 at 4:48 PM, Tom Fornoville <[email protected]> wrote:
> thanks Claus and Erwin for the answers, I'm starting to understand the
> patterns a little bit better.
>
> I am however still not sure how to handle the use-case with only 1
> aggregator.
> As Claus suggested I'm now sending an extra message after all the
> organisations are split.
>
> The routes now look something like this:
>
> <route>
> <from uri="file:src/data?noop=true" />
> <setHeader headerName="transactionGUID">
> <simple>${file:name.noext}-${date:now:yyyyMMdd-HHmmssSSS}</simple>
> </setHeader>
> <split>
> <tokenize token="organisation" xml="true" />
> <to uri="activemq:organisation.queue" />
> </split>
> <setBody>
> <simple>LAST_LINE</simple>
> </setBody>
> <to uri="activemq:organisation.queue" />
> </route>
>
> <route id="process-organisations">
> <from uri="activemq:organisation.queue" />
> <aggregate strategyRef="organisationAggregator" completionSize="50"
> completionTimeout="5000">
> <correlationExpression>
> <header>transactionGUID</header>
> </correlationExpression>
> <bean ref="organisationServiceActivator" method="updateOrganisations" />
> </aggregate>
> <choice>
> <when>
> <simple>${body} == 'LAST_LINE'</simple>
> <bean ref="transactionServiceActivator" method="updateTransaction" />
> </when>
> </choice>
> </route>
>
> The only problem now is that my
> transactionServiceActivator.updateTransaction is called before the last
> batch of organisations is updated when the organisations are not an exact
> multiple of 50.
> For example starting with a file that has 120 organisations I get the
> following:
> * update organisations 1-50
> * update organisations 51-100
> * update transaction
> * update organisations 101-120
>
> Tom Fornoville
> Senior Developer
> m: +32 478 65 86 51
> www.roots.be
>
>
> On Tue, Oct 22, 2013 at 3:01 PM, Erwin Etchart <[email protected]>wrote:
>
>> Just for help remember (is a mistake that i made) that every aggregate
>> needs its own repository(i loose a few hours with this).
>>
>> Regards.
>>
>>
>> 2013/10/22 Tom.Fornoville <[email protected]>
>>
>> > Hi everyone,
>> >
>> > I have the following use-case:
>> > * messages arrive on a queue, 1 message for each organisation be
>> > created/updated, messages that belong to the same transaction (= our own
>> > term, NOT a JPA transaction or something like that) all have the same
>> > transactionGUID
>> >
>> > * we need to group organisations together in batches of 50 to process
>> them,
>> > the last batch can be smaller
>> >
>> > * during processing extra entities can be added to the queue: updates for
>> > an
>> > enterprise can trigger updates for establishments under that enterprise
>> >
>> > * after processing by the bean we need to update the transaction entity
>> > with
>> > statistics (# updated, # created etc...)
>> >
>> > I was thinking of doing this with 2 aggregators like this:
>> >
>> > <route id="process-organisations">
>> > <from uri="activemq:organisation.update.queue" />
>> > <aggregate strategyRef="organisationAggregator"
>> completionSize="50"
>> > completionTimeout="5000">
>> > <correlationExpression>
>> > <header>transactionGUID</header>
>> > </correlationExpression>
>> > <bean ref="organisationServiceActivator"
>> > method="updateOrganisations" />
>> > <to uri="activemq:organisation.processed.queue" />
>> > </aggregate>
>> > </route>
>> >
>> > <route id="aggregate-organisation-results">
>> > <from uri="activemq:organisation.processed.queue" />
>> > <aggregate strategyRef="?">
>> > <correlationExpression>
>> > <header>?</header>
>> > </correlationExpression>
>> > <completionPredicate>
>> > <simple>?</simple>
>> > </completionPredicate>
>> > <bean ref="transactionServiceActivator"
>> > method="updateTransaction" />
>> > </aggregate>
>> > </route>
>> >
>> > The questions I have:
>> > * are 2 routes with aggregators the best way to handle this use case?
>> > * will the first aggregate keep working or will it stop after the first
>> > batch of 50?
>> > * what should be the correlation and completion paramters for the second
>> > aggregation?
>> >
>> > Thanks in advance,
>> > Tom
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> >
>> http://camel.465427.n5.nabble.com/Grouping-messages-for-batch-processing-tp5742015.html
>> > Sent from the Camel - Users mailing list archive at Nabble.com.
>> >
>>
--
Claus Ibsen
-----------------
Red Hat, Inc.
Email: [email protected]
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen