Hi Claus, On the way home I realized that I should put the updateTransaction in the aggregate and not after...dumb copy/paste error. Thanks for the tip about the property for the splitter.
Learning Camel and EIP one pattern at a time ;-) Tom Fornoville Senior Developer m: +32 478 65 86 51 www.roots.be On Tue, Oct 22, 2013 at 6:05 PM, Claus Ibsen <[email protected]> wrote: > You should have the <choice> inside the <aggregate> so all the logic > is executed as part of when the aggregator completes. > > And btw the splitter has a property it sets on the last message when > its complete. You can transfer that as a header over JMS > > <from uri="file:src/data?noop=true" /> > <setHeader headerName="transactionGUID"> > <simple>${file:name.noext}-${date:now:yyyyMMdd-HHmmssSSS}</simple> > </setHeader> > <split> > <tokenize token="organisation" xml="true" /> > <setHeader headerName="lastMessage"> > <property>Exchange.SPLIT_COMPLETE</property> > </setHeader> > <to uri="activemq:organisation.queue" /> > </split> > > </route> > > > On Tue, Oct 22, 2013 at 4:48 PM, Tom Fornoville <[email protected]> > wrote: > > thanks Claus and Erwin for the answers, I'm starting to understand the > > patterns a little bit better. > > > > I am however still not sure how to handle the use-case with only 1 > > aggregator. > > As Claus suggested I'm now sending an extra message after all the > > organisations are split. > > > > The routes now look something like this: > > > > <route> > > <from uri="file:src/data?noop=true" /> > > <setHeader headerName="transactionGUID"> > > <simple>${file:name.noext}-${date:now:yyyyMMdd-HHmmssSSS}</simple> > > </setHeader> > > <split> > > <tokenize token="organisation" xml="true" /> > > <to uri="activemq:organisation.queue" /> > > </split> > > <setBody> > > <simple>LAST_LINE</simple> > > </setBody> > > <to uri="activemq:organisation.queue" /> > > </route> > > > > <route id="process-organisations"> > > <from uri="activemq:organisation.queue" /> > > <aggregate strategyRef="organisationAggregator" completionSize="50" > > completionTimeout="5000"> > > <correlationExpression> > > <header>transactionGUID</header> > > </correlationExpression> > > <bean ref="organisationServiceActivator" method="updateOrganisations" /> > > </aggregate> > > <choice> > > <when> > > <simple>${body} == 'LAST_LINE'</simple> > > <bean ref="transactionServiceActivator" method="updateTransaction" /> > > </when> > > </choice> > > </route> > > > > The only problem now is that my > > transactionServiceActivator.updateTransaction is called before the last > > batch of organisations is updated when the organisations are not an exact > > multiple of 50. > > For example starting with a file that has 120 organisations I get the > > following: > > * update organisations 1-50 > > * update organisations 51-100 > > * update transaction > > * update organisations 101-120 > > > > Tom Fornoville > > Senior Developer > > m: +32 478 65 86 51 > > www.roots.be > > > > > > On Tue, Oct 22, 2013 at 3:01 PM, Erwin Etchart <[email protected] > >wrote: > > > >> Just for help remember (is a mistake that i made) that every aggregate > >> needs its own repository(i loose a few hours with this). > >> > >> Regards. > >> > >> > >> 2013/10/22 Tom.Fornoville <[email protected]> > >> > >> > Hi everyone, > >> > > >> > I have the following use-case: > >> > * messages arrive on a queue, 1 message for each organisation be > >> > created/updated, messages that belong to the same transaction (= our > own > >> > term, NOT a JPA transaction or something like that) all have the same > >> > transactionGUID > >> > > >> > * we need to group organisations together in batches of 50 to process > >> them, > >> > the last batch can be smaller > >> > > >> > * during processing extra entities can be added to the queue: updates > for > >> > an > >> > enterprise can trigger updates for establishments under that > enterprise > >> > > >> > * after processing by the bean we need to update the transaction > entity > >> > with > >> > statistics (# updated, # created etc...) > >> > > >> > I was thinking of doing this with 2 aggregators like this: > >> > > >> > <route id="process-organisations"> > >> > <from uri="activemq:organisation.update.queue" /> > >> > <aggregate strategyRef="organisationAggregator" > >> completionSize="50" > >> > completionTimeout="5000"> > >> > <correlationExpression> > >> > <header>transactionGUID</header> > >> > </correlationExpression> > >> > <bean ref="organisationServiceActivator" > >> > method="updateOrganisations" /> > >> > <to uri="activemq:organisation.processed.queue" /> > >> > </aggregate> > >> > </route> > >> > > >> > <route id="aggregate-organisation-results"> > >> > <from uri="activemq:organisation.processed.queue" /> > >> > <aggregate strategyRef="?"> > >> > <correlationExpression> > >> > <header>?</header> > >> > </correlationExpression> > >> > <completionPredicate> > >> > <simple>?</simple> > >> > </completionPredicate> > >> > <bean ref="transactionServiceActivator" > >> > method="updateTransaction" /> > >> > </aggregate> > >> > </route> > >> > > >> > The questions I have: > >> > * are 2 routes with aggregators the best way to handle this use case? > >> > * will the first aggregate keep working or will it stop after the > first > >> > batch of 50? > >> > * what should be the correlation and completion paramters for the > second > >> > aggregation? > >> > > >> > Thanks in advance, > >> > Tom > >> > > >> > > >> > > >> > > >> > -- > >> > View this message in context: > >> > > >> > http://camel.465427.n5.nabble.com/Grouping-messages-for-batch-processing-tp5742015.html > >> > Sent from the Camel - Users mailing list archive at Nabble.com. > >> > > >> > > > > -- > Claus Ibsen > ----------------- > Red Hat, Inc. > Email: [email protected] > Twitter: davsclaus > Blog: http://davsclaus.com > Author of Camel in Action: http://www.manning.com/ibsen >
