Thanks Eno that makes sense. If then this is an implementation of Transformer which is in a DSL topology with DSL sinks ie `to()`, is the commit surplus to requirement? I suspect it will do no harm at the very least.
Thanks Adrian -----Original Message----- From: Eno Thereska [mailto:eno.there...@gmail.com] Sent: 09 February 2017 16:14 To: users@kafka.apache.org Subject: Re: ProcessorContext commit question Hi Adrian, It's also done in the DSL, but at a different point, in Task.commit(), since the flow is slightly different. Yes, once data is stored in stores, the offsets should be committed, so in case of a crash the same offsets are not processed again. Thanks Eno > On 9 Feb 2017, at 16:06, Adrian McCague <adrian.mcca...@zopa.com> wrote: > > Hi all, > > In processor and transformer implementations, what are the use cases for > calling `context.commit()`? Examples imply it should be called when state > store modifications are complete, Streams DSL implementations do not fall in > line with the examples, ie KStreamAggregate. > > Thanks > Adrian