Hi, Jeremiah,

You can wait in the StreamTask.process(), which essentially will block the
whole container and no more Kafka messages will be delivered to the
StreamTask.

-Yi

On Wed, Jun 15, 2016 at 12:13 PM, Jeremiah Adams <jad...@helixeducation.com>
wrote:

> Thank you for the info.
>
> Is there any way to 'pause' the job or stop processing kafka from inside a
> StreamTask.process() method? That would work for me too.
>
>
> Jeremiah Adams
> Software Engineer
> www.helixeducation.com
> Blog | Twitter | Facebook | LinkedIn
>
> ________________________________________
> From: Yi Pan <nickpa...@gmail.com>
> Sent: Wednesday, June 15, 2016 1:02 PM
> To: dev@samza.apache.org
> Subject: Re: Manually Commit Offsets?
>
> Hi, Jeremiah,
>
> CheckpointManager is an interface that would allow you to implement where
> to read/write your checkpoint offsets for each task. The exposed user API
> to checkpoint in Samza is through TaskCoordinator.commit(). Unfortunately,
> it does not allow you to commit at the granularity of a certain offset on a
> specific partition. The design choice we made here is that when user calls
> commit(), that means the user have processed all messages up to the current
> one successfully. The recommended pattern is that you will wait for all the
> current pending messages are processed, then call commit().
>
> Please let me know if you have further doubts.
>
> -Yi
>
> On Tue, Jun 14, 2016 at 2:13 PM, Jeremiah Adams <jad...@helixeducation.com
> >
> wrote:
>
> > Thanks,
> >
> > I see no commit method in TaskContext. Unless I am missing something it
> is
> > TaskCoordinator. TaskCoordinator.commit() also does not look to give me
> the
> > ability to set the value of the checkpoint, just checkpoint after
> > unwrapping the incoming message. I need to treat the messages as if they
> > were not handled at all when the remote system is unavailable.
> >
> >
> > I have been looking at the CheckpointManager to do this but cannot see
> how
> > to wire it into my StreamTask.
> >
> >
> > Jeremiah Adams
> > Software Engineer
> > www.helixeducation.com
> > Blog | Twitter | Facebook | LinkedIn
> >
> > ________________________________________
> > From: Yi Pan <nickpa...@gmail.com>
> > Sent: Tuesday, June 14, 2016 2:28 PM
> > To: dev@samza.apache.org
> > Subject: Re: Manually Commit Offsets?
> >
> > Sorry. Correction:
> >
> >
> > > 2) in your code, call TaskContext.commit() whenever you are ready to
> > > checkpoint.
> > >
> > >
> > *TaskCoordinator.commit()*
> >
> >
> >
> > >
> > > On Tue, Jun 14, 2016 at 10:16 AM, Jeremiah Adams <
> > > jad...@helixeducation.com> wrote:
> > >
> > >> We need to send messages to a remote service. I need to implement a
> > >> circuit breaker to address the scenario in which the remote system is
> > >> unavailable. I need to change the current offset to reprocess the
> > current
> > >> offset while the remote system is down. These concerns are similar to
> > those
> > >> outlined here: https://issues.apache.org/jira/browse/SAMZA-794?<
> > >> https://issues.apache.org/jira/browse/SAMZA-794>
> > >>
> > >> It looks like Samza's Checkpointing mechanism replaces kafka's
> > >> auto-commit feature and there is no API for manually manipulating the
> > >> Checkpointing?
> > >>
> > >> Can someone point me in the right direction?
> > >>
> > >> Thanks in advance.
> > >>
> > >>
> > >>
> > >> Jeremiah Adams
> > >> Software Engineer
> > >> www.helixeducation.com<http://www.helixeducation.com/>
> > >> Blog<http://www.helixeducation.com/blog/> | Twitter<
> > >> https://twitter.com/HelixEducation> | Facebook<
> > >> https://www.facebook.com/HelixEducation> | LinkedIn<
> > >> http://www.linkedin.com/company/3609946>
> > >>
> > >
> > >
> >
>

Reply via email to