Hi, Jeremiah, You can wait in the StreamTask.process(), which essentially will block the whole container and no more Kafka messages will be delivered to the StreamTask.
-Yi On Wed, Jun 15, 2016 at 12:13 PM, Jeremiah Adams <jad...@helixeducation.com> wrote: > Thank you for the info. > > Is there any way to 'pause' the job or stop processing kafka from inside a > StreamTask.process() method? That would work for me too. > > > Jeremiah Adams > Software Engineer > www.helixeducation.com > Blog | Twitter | Facebook | LinkedIn > > ________________________________________ > From: Yi Pan <nickpa...@gmail.com> > Sent: Wednesday, June 15, 2016 1:02 PM > To: dev@samza.apache.org > Subject: Re: Manually Commit Offsets? > > Hi, Jeremiah, > > CheckpointManager is an interface that would allow you to implement where > to read/write your checkpoint offsets for each task. The exposed user API > to checkpoint in Samza is through TaskCoordinator.commit(). Unfortunately, > it does not allow you to commit at the granularity of a certain offset on a > specific partition. The design choice we made here is that when user calls > commit(), that means the user have processed all messages up to the current > one successfully. The recommended pattern is that you will wait for all the > current pending messages are processed, then call commit(). > > Please let me know if you have further doubts. > > -Yi > > On Tue, Jun 14, 2016 at 2:13 PM, Jeremiah Adams <jad...@helixeducation.com > > > wrote: > > > Thanks, > > > > I see no commit method in TaskContext. Unless I am missing something it > is > > TaskCoordinator. TaskCoordinator.commit() also does not look to give me > the > > ability to set the value of the checkpoint, just checkpoint after > > unwrapping the incoming message. I need to treat the messages as if they > > were not handled at all when the remote system is unavailable. > > > > > > I have been looking at the CheckpointManager to do this but cannot see > how > > to wire it into my StreamTask. > > > > > > Jeremiah Adams > > Software Engineer > > www.helixeducation.com > > Blog | Twitter | Facebook | LinkedIn > > > > ________________________________________ > > From: Yi Pan <nickpa...@gmail.com> > > Sent: Tuesday, June 14, 2016 2:28 PM > > To: dev@samza.apache.org > > Subject: Re: Manually Commit Offsets? > > > > Sorry. Correction: > > > > > > > 2) in your code, call TaskContext.commit() whenever you are ready to > > > checkpoint. > > > > > > > > *TaskCoordinator.commit()* > > > > > > > > > > > > On Tue, Jun 14, 2016 at 10:16 AM, Jeremiah Adams < > > > jad...@helixeducation.com> wrote: > > > > > >> We need to send messages to a remote service. I need to implement a > > >> circuit breaker to address the scenario in which the remote system is > > >> unavailable. I need to change the current offset to reprocess the > > current > > >> offset while the remote system is down. These concerns are similar to > > those > > >> outlined here: https://issues.apache.org/jira/browse/SAMZA-794?< > > >> https://issues.apache.org/jira/browse/SAMZA-794> > > >> > > >> It looks like Samza's Checkpointing mechanism replaces kafka's > > >> auto-commit feature and there is no API for manually manipulating the > > >> Checkpointing? > > >> > > >> Can someone point me in the right direction? > > >> > > >> Thanks in advance. > > >> > > >> > > >> > > >> Jeremiah Adams > > >> Software Engineer > > >> www.helixeducation.com<http://www.helixeducation.com/> > > >> Blog<http://www.helixeducation.com/blog/> | Twitter< > > >> https://twitter.com/HelixEducation> | Facebook< > > >> https://www.facebook.com/HelixEducation> | LinkedIn< > > >> http://www.linkedin.com/company/3609946> > > >> > > > > > > > > >