Great, thanks all!

Regards
Sab

On Tue, Apr 19, 2016 at 5:00 AM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Sabarish Sasidharan,
>
> The key point is to make your KV-store update idempotent. So, if the offset
> associated with the aggregated value are written in the same row in RocksDB
> (i.e. atomicity is achieved here), I think that your approach would work.
> As Robert mentioned, offsets are always committed last in Samza. Hence, any
> failure recovery is guaranteed to replay some  of the old messages. If the
> flushed state store has the aggregated value together w/ the offset, you
> can use the offset to de-dup the replayed old messages that are already
> applied to the aggregated results.
>
> @Robert, yes, the order you listed would be maintained.
>
> Thanks!
>
> -Yi
>
> On Fri, Apr 15, 2016 at 12:16 PM, Robert Crim <rjc...@gmail.com> wrote:
>
> > Looking at:
> >
> >
> https://github.com/apache/samza/blob/f02386464d31b5a496bb0578838f51a0331bfffa/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L171
> >
> >
> > The commit function, in order, does:
> > 1. Flushes metrics
> > 2. Flushes stores
> > 3. Produces messages from the collectors
> > 4. Write offsets
> >
> > So I would reason that it would be OK to store an offset you've seen in
> the
> > store and use that to skip the messages if you've already mutated your
> data
> > -- but be aware any of 2 (if multiple stores) ,3, or 4 may not have
> > happened so you might want to do those again. You'd need to be careful if
> > your changes span multiple stores or keys since multiple writes to
> > changelogs are not atomic.
> >
> > Question to maintainers: is it safe for Samza users to relay on this
> order?
> >
> > On Fri, Apr 15, 2016 at 11:31 AM, Sabarish Sasidharan <
> > sabarish....@gmail.com> wrote:
> >
> > > Hi Guozhang
> > >
> > > Thanks. Assuming the checkpoint would typically be behind the offset
> > > persisted in my store (+ changelog), when the messages are replayed
> > > starting from the checkpoint, I can very well skip those by comparing
> > > against the offset in my store right? So I am not understanding why
> > > duplicates would affect my state.
> > >
> > > Regards
> > > Sab
> > >
> > > On Fri, Apr 15, 2016 at 10:07 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > >
> > > > Hi Sab,
> > > >
> > > > For stateful processing where you have persistent state stores, you
> > need
> > > to
> > > > maintain the checkpoint which includes the committed offsets as well
> as
> > > the
> > > > store flushed in sync, but right not these two operations are not
> done
> > > > atomically, and hence if you fail in between, you could still get
> > > > duplicates where you consume from the committed offsets while some of
> > > them
> > > > have already updated the stores.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Apr 14, 2016 at 11:56 PM, Sasidharan, Sabarish <
> > > > sabarish.sasidha...@harman.com> wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > To achieve exactly once processing for my aggregates, wouldn’t it
> be
> > > > > enough if I maintain the latest offset processed for the aggregate
> > and
> > > > > check against that offset when messages are replayed on recovery?
> Am
> > I
> > > > > missing something here?
> > > > >
> > > > > Thanks
> > > > >
> > > > > Regards
> > > > > Sab
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>

Reply via email to