gaoyunhaii commented on pull request #14831:
URL: https://github.com/apache/flink/pull/14831#issuecomment-783059325


   Hi @kezhuw very thanks for the deep thoughts and suggestions! 
   
   > But instead of "Insert barriers for channels received EndOfPartition", I 
would suggest to count EndOfPartitionEvent directly in checkpoint barrier 
handler. That is, in implementation, we could count/trigger checkpoint in 
CheckpointBarrierHandler.processEndOfPartition and 
ChannelStatePersister.checkForBarrier if there is pending checkpoint. 
Personally, I think it is same as FinalizeBarrierComplementPrcoessor but much 
straightforward. The point is if we decide to support EndOfPartitionEvent 
during checkpoint, we should burn this knowledge directly to 
CheckpointBarrierHandler.
   > 
   > To overtake buffers before EndOfPartitionEvent for unaligned checkpoint, I 
think the minimal requirement is flushing all buffers to network stack before 
EndOfPartitionEvent. An asynchronous completable flush operation on 
PipelinedSubpartition should meet this requirement. Before that flush operation 
completed, unaligned checkpoint could take place as normal, after that there 
will be no output buffers to overtake. Also, an request-response paired 
events(eg. EndOfXyzEvent, XyzConsumedEvent) will fulfill this requirement. I am 
not sure how it is viable to introduce FINISHING for checkpointing as I think 
ExecutionState is tackled by Task while checkpointing is tackled inside 
StreamTask. But I think this "how to overtake buffers for unaligned checkpoint 
before EndOfPartitionEvent" could be a separated issue.
   
   Yes, as the discussion with Piotr in the above, I think the listed process 
here is similar to the new process we proposed. One difference is that now we 
wait not only until all data have been flushed, but also the data has been 
received and processed by the downstream tasks. After that all the user records 
are processed, then the upstream task could emit EndOfPartition, and the 
`CheckpointBarrierHandler` in the downstream task would be able to view 
`EndOfPartition` as a sign that all the pending checkpoints should be finished.
   
   > Besides this, I think checkpoint after EndOfInputEvent(eg. at FINISHING or 
all buffer flushed) is similar to last checkpoint for 2pc. I am kind of worry 
about buffer duplication after recovery a successful checkpoint created in that 
period. Checkpoint in that period will include last buffer from possible 
end-of-stream-flush operation, after recovery, that end-of-stream-flush will 
still be executed ?
   
   No, the waiting happens after all the operators have been closed, and the 
operator subtask would be marked as finished in the checkpoint. If all the 
subtask of the operators are finished, the execution of the operator would be 
skipped after the failover. If not all subtask are finished, the remaining 
states will be re-assigned, but we assume the produced records for this 
operator as a whole should not changed.  A more detailed explanation is in 
[here](https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks).
   
   > FLINK-21085: I think the currently solution will undermine what we try to 
follow in FLINK-21133. In that jira, we try to unify handling of 
stop-with-savepoint in one place, while the currently approach tends to 
duplicate checkpoint trigger in many code paths. I would like to suggest to 
enhance and generalize MultipleInputStreamTask.triggerCheckpointAsync to 
StreamTask. This way we will have only place to trigger checkpoint.
   
   Sorry I'm not quite sure what the duplication refers to, would you detail 
this issue~?
   
   > FLINK-21081: Just yet another approach to list/consider/evaluation. May be 
we could send checkpoint trigger rpc to all running tasks ? If tasks with 
active inputs receive checkpoint trigger rpc, just bookkeeping it. If all 
active inputs are EndOfPartitionEvent in the meantime, task will trigger 
checkpoint themselves. If checkcpoint-trigger is coded in one place, this 
bookkeeping and lazy-trigger will only be coded once in one place. But this 
approach may be network-consuming in large job topology.
   
   Personally I would not lean towards this method due to the possible costs 
for triggering all the tasks, especially for large jobs. 
   
   > I am no sure how finishing handshake will be delivered, so basically, I am 
also not sure how this handshaking will combine with stop-with-savepoint. But 
if the handshaking happens at StreamTask level, I think there will be no big 
deal. Though, still unsure.
   
   I pushed the implementation to this branch: 
https://github.com/gaoyunhaii/flink/commits/final_cp_6_handshark, it is almost 
done. Any comments are welcome~
   
   
   > Assumes that operator C has two upstream A and B. During a checkpoint, A 
ends with EndOfPartitionEvent while B keeps running. C could receive checkpoint 
barrier from B and then EndOfPartitionEvent from A. In this case, FLINK-21081 
does not help.
   
   We should have implementation this in the above new branch, In the new 
implementation, when the channel received `EndOfPartitionEvent` would also be 
viewed as aligned and the checkpoint could be trigger normally. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to