It's not needed if you are just doing the equivalent of to(), and you don't 
have a state store.
I'm realising the Javadoc for commit() needs updating, it doesn't explain much 
at the moment.

Eno


> On 9 Feb 2017, at 16:22, Adrian McCague <adrian.mcca...@zopa.com> wrote:
> 
> Thanks Eno that makes sense.
> 
> If then this is an implementation of Transformer which is in a DSL topology 
> with DSL sinks ie `to()`, is the commit surplus to requirement? I suspect it 
> will do no harm at the very least.
> 
> Thanks
> Adrian
> 
> -----Original Message-----
> From: Eno Thereska [mailto:eno.there...@gmail.com] 
> Sent: 09 February 2017 16:14
> To: users@kafka.apache.org
> Subject: Re: ProcessorContext commit question
> 
> Hi Adrian,
> 
> It's also done in the DSL, but at a different point, in Task.commit(), since 
> the flow is slightly different. Yes, once data is stored in stores, the 
> offsets should be committed, so in case of a crash the same offsets are not 
> processed again.
> 
> Thanks
> Eno
> 
> 
> 
>> On 9 Feb 2017, at 16:06, Adrian McCague <adrian.mcca...@zopa.com> wrote:
>> 
>> Hi all,
>> 
>> In processor and transformer implementations, what are the use cases for 
>> calling `context.commit()`? Examples imply it should be called when state 
>> store modifications are complete, Streams DSL implementations do not fall in 
>> line with the examples, ie KStreamAggregate.
>> 
>> Thanks
>> Adrian
> 

Reply via email to