I think you are touching on something important here. There is a discussion/PullRequest about graceful shutdown of streaming jobs (like stop the sources and let the remainder of the streams run out).
With the work in progress to draw external checkpoint, it should be easy do checkpoint-and-close. We may not even need the last ack in the "checkpoint -> ack -> notify -> ack" sequence, when the operators simply wait for the "notifyComplete" function to finish. Then, the operators finish naturally only successfully when the "notifyComplete()" method succeeds, otherwise they go to the state "failed". That is good, because we need no extra mechanism (extra message type). What we do need anyways is a way to detect when the checkpoint did not globally succeed, that the functions where it succeeded do not wait forever for the "notifySuccessful" message. We have two things here now: 1) Graceful shutdown should trigger an "internal" checkpoint (which is immediately discarded), in order to commit pending data for cases where data is staged between checkpoints. 2) An option to shut down with external checkpoint would also be important, to stop and resume from exactly there. Stephan On Wed, Nov 11, 2015 at 3:19 PM, Gyula Fóra <gyf...@apache.org> wrote: > Hey guys, > > With recent discussions around being able to shutdown and restart streaming > jobs from specific checkpoints, there is another issue that I think needs > tackling. > > As far as I understand when a streaming job finishes the tasks are not > notified for the last checkpoints and also jobs don't take a final > checkpoint before shutting down. > > In my opinion this might lead to situations when the user cannot tell > whether the job finished properly (with consistent states/ outputs) etc. To > give you a concrete example, let's say I am using the RollingSink to > produce exactly once output files. If the job finishes I think there will > be some files that remain in the pending state and are never completed. The > user then sees some complete files, and some pending files for the > completed job. The question is then, how do I tell whether the pending > files were actually completed properly no that the job is finished. > > Another example would be that I want to manually shut down my job at 12:00 > and make sure that I produce every output up to that point. Is there any > way to achieve this currently? > > I think we need to do 2 things to make this work: > 1. Job shutdowns (finish/manual) should trigger a final checkpoint > 2. These final checkpoints should actually be 2 phase checkpoints: > checkpoint -> ack -> notify -> ack , then when the checkpointcoordinator > gets all the notification acks it can tell the user that the system shut > down cleanely. > > Unfortunately it can happen that for some reason the coordinator does not > receive all the acks for a complete job, in that case it can warn the user > that the checkpoint might be inconsistent. > > Let me know what you think! > > Cheers, > Gyula >