Great, thanks all! Regards Sab
On Tue, Apr 19, 2016 at 5:00 AM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Sabarish Sasidharan, > > The key point is to make your KV-store update idempotent. So, if the offset > associated with the aggregated value are written in the same row in RocksDB > (i.e. atomicity is achieved here), I think that your approach would work. > As Robert mentioned, offsets are always committed last in Samza. Hence, any > failure recovery is guaranteed to replay some of the old messages. If the > flushed state store has the aggregated value together w/ the offset, you > can use the offset to de-dup the replayed old messages that are already > applied to the aggregated results. > > @Robert, yes, the order you listed would be maintained. > > Thanks! > > -Yi > > On Fri, Apr 15, 2016 at 12:16 PM, Robert Crim <rjc...@gmail.com> wrote: > > > Looking at: > > > > > https://github.com/apache/samza/blob/f02386464d31b5a496bb0578838f51a0331bfffa/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L171 > > > > > > The commit function, in order, does: > > 1. Flushes metrics > > 2. Flushes stores > > 3. Produces messages from the collectors > > 4. Write offsets > > > > So I would reason that it would be OK to store an offset you've seen in > the > > store and use that to skip the messages if you've already mutated your > data > > -- but be aware any of 2 (if multiple stores) ,3, or 4 may not have > > happened so you might want to do those again. You'd need to be careful if > > your changes span multiple stores or keys since multiple writes to > > changelogs are not atomic. > > > > Question to maintainers: is it safe for Samza users to relay on this > order? > > > > On Fri, Apr 15, 2016 at 11:31 AM, Sabarish Sasidharan < > > sabarish....@gmail.com> wrote: > > > > > Hi Guozhang > > > > > > Thanks. Assuming the checkpoint would typically be behind the offset > > > persisted in my store (+ changelog), when the messages are replayed > > > starting from the checkpoint, I can very well skip those by comparing > > > against the offset in my store right? So I am not understanding why > > > duplicates would affect my state. > > > > > > Regards > > > Sab > > > > > > On Fri, Apr 15, 2016 at 10:07 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > Hi Sab, > > > > > > > > For stateful processing where you have persistent state stores, you > > need > > > to > > > > maintain the checkpoint which includes the committed offsets as well > as > > > the > > > > store flushed in sync, but right not these two operations are not > done > > > > atomically, and hence if you fail in between, you could still get > > > > duplicates where you consume from the committed offsets while some of > > > them > > > > have already updated the stores. > > > > > > > > Guozhang > > > > > > > > > > > > On Thu, Apr 14, 2016 at 11:56 PM, Sasidharan, Sabarish < > > > > sabarish.sasidha...@harman.com> wrote: > > > > > > > > > Hi > > > > > > > > > > To achieve exactly once processing for my aggregates, wouldn’t it > be > > > > > enough if I maintain the latest offset processed for the aggregate > > and > > > > > check against that offset when messages are replayed on recovery? > Am > > I > > > > > missing something here? > > > > > > > > > > Thanks > > > > > > > > > > Regards > > > > > Sab > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > >