It's not needed if you are just doing the equivalent of to(), and you don't have a state store. I'm realising the Javadoc for commit() needs updating, it doesn't explain much at the moment.
Eno > On 9 Feb 2017, at 16:22, Adrian McCague <adrian.mcca...@zopa.com> wrote: > > Thanks Eno that makes sense. > > If then this is an implementation of Transformer which is in a DSL topology > with DSL sinks ie `to()`, is the commit surplus to requirement? I suspect it > will do no harm at the very least. > > Thanks > Adrian > > -----Original Message----- > From: Eno Thereska [mailto:eno.there...@gmail.com] > Sent: 09 February 2017 16:14 > To: users@kafka.apache.org > Subject: Re: ProcessorContext commit question > > Hi Adrian, > > It's also done in the DSL, but at a different point, in Task.commit(), since > the flow is slightly different. Yes, once data is stored in stores, the > offsets should be committed, so in case of a crash the same offsets are not > processed again. > > Thanks > Eno > > > >> On 9 Feb 2017, at 16:06, Adrian McCague <adrian.mcca...@zopa.com> wrote: >> >> Hi all, >> >> In processor and transformer implementations, what are the use cases for >> calling `context.commit()`? Examples imply it should be called when state >> store modifications are complete, Streams DSL implementations do not fall in >> line with the examples, ie KStreamAggregate. >> >> Thanks >> Adrian >