Thanks Eno that makes sense.

If then this is an implementation of Transformer which is in a DSL topology 
with DSL sinks ie `to()`, is the commit surplus to requirement? I suspect it 
will do no harm at the very least.

Thanks
Adrian

-----Original Message-----
From: Eno Thereska [mailto:eno.there...@gmail.com] 
Sent: 09 February 2017 16:14
To: users@kafka.apache.org
Subject: Re: ProcessorContext commit question

Hi Adrian,

It's also done in the DSL, but at a different point, in Task.commit(), since 
the flow is slightly different. Yes, once data is stored in stores, the offsets 
should be committed, so in case of a crash the same offsets are not processed 
again.

Thanks
Eno



> On 9 Feb 2017, at 16:06, Adrian McCague <adrian.mcca...@zopa.com> wrote:
> 
> Hi all,
> 
> In processor and transformer implementations, what are the use cases for 
> calling `context.commit()`? Examples imply it should be called when state 
> store modifications are complete, Streams DSL implementations do not fall in 
> line with the examples, ie KStreamAggregate.
> 
> Thanks
> Adrian

Reply via email to