Hi Bart, In terms of your assumption,
* Ts <= To , this is correction. The code backups this assumption is here: in RunLoop <https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala> , the commit is called after each process and window methods. E.g. process1 -> commit T1 -> process2 -> fail -> commit T2 will not happen. When restarting the job, it consumes from T1. The messages between T1 and T2 will be reprocessed. * Ti <= Ts is not always true. There are two scenarios ( we call db.put() in process/window ). 1) process1 -> db.put1 success -> commit T1 -> process2 -> db.put2 success -> following process fails -> commit T2 will not happen. In this scenario, Ti <= Ts because the latest changelog happens later than the checkpoint. In this scenario, when we reprocess the stream from T1, same operation will happen twice because db.put2 already succeeds.* It is usually fine that putting the same data twice/deleting the same data twice. It may have some issues if the db.put2 is accumulating based on its previous value. -- @Chris, is this true ?* 2) process1 -> db.put1 -> commit T1 -> process2 -> fail . This is Ti >= Ts because the latest checkpoint happens after changelog. The changelog code is in the LoggedStore <https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala> , you can tell that, the changelog is written after each db operation (put/delete,etc). All the db operations are called in process or window method. Hope this can help you. Thanks, Fang, Yan yanfang...@gmail.com On Tue, Apr 7, 2015 at 7:27 AM, Bart Wyatt <bart.wy...@dsvolition.com> wrote: > We are trying to make sure that we are handling the proper edge cases in > our stateful tasks so that our processes can survive failure well. > > Given the changelog will recreate the KV store (state) up to the point of > time of the last durable changelog write(Ts), the checkpoint will start > input from the point of time represented in the last durable checkpoint > write(Ti) and the output will have messages from it at the 3rd point in > time of the last durable output write(To), our current assumption is that > in all recovery cases: > > Ti <= Ts <= To > > This means that some input may be "replayed" from the point of view of the > KV store which is handled by normal at-least-once-delivery semantics > processing and that we may duplicate output messages that would have been > produced between Ts and To which is also consistent with > at-least-once-delivery. > > However, I cannot find code that backs this assumptions and I'm hoping > I've just missed it, because: > > If To < Ts, then we may drop output because the state assumed it was > already written and due to timing of actual writes to kafka or durability > concerns the output is not there. This is important for a job, for > example, that emits "session started @ X" messages on the first message for > any given session key. The state will see a repeated message as a > duplicate and not emit the output. I think this is solvable in the job as > long as To >= Ti, but I am not certain the solution is generally applicable > to tasks where side-effects of previous input exist in the state and have > an effect on future output. > > If Ts < Ti, then our stateful task will effectively drop input, even > though it may have produced some or all of the output for those messages in > its previous incarnation, as the state used for all future processes will > not have the side effects of processing the messages between Ts and Ti. We > see no solution for this at the task level as it would require collusion > between two backing system (checkpoints and changelogs) to correct, > presumably by rewinding Ti to Ts. > > Perhaps my code search failed because I was expecting some colluding > system that would wait for output to write out before writing changelog > entries and then again before checkpoints and that was to presumptive. Is > there something about the code, the assumption or my edge analysis that > I've missed to address this? > > -Bart > > > ________________________________ > This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or > PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient > and, therefore, may not be retransmitted to any party outside of the > recipient's organization without the prior written consent of the sender. > If you have received this e-mail in error please notify the sender > immediately by telephone or reply e-mail and destroy the original message > without making a copy. Deep Silver, Inc. accepts no liability for any > losses or damages resulting from infected e-mail transmissions and viruses > in e-mail attachments. >