[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17022938#comment-17022938 ]
Kostas Kloudas commented on FLINK-2646: --------------------------------------- Hi [~maguowei], and thanks for the warming up the discussion on the topic! One comment about the similarity between EndOfInput (EOI) and checkpoints is that there is also another difference between them. Focusing on the StreamingFileSink, which is also the example in the FLIP, the difference is that the EOI will close the in-progress part files, which the checkpoint normally does not. The sink currently can keep on writing to the same in-progress file even after the checkpoint. In this case, it checkpoints the offset up to which the data is valid. In the case of EOI, we close the in-progress file because there is no need to keep it open as there will be no future data. For non 2PC sinks, then we may even not have checkpoints, so the logic of the EOI will not include checkpointing. In addition, when checkpointing the state, I think it should be the checkpoint coordinator who initiates the process because: # it has to register that the checkpoint/savepoint was successful, and # make sure that the notifyCheckpointCompleted is sent after all tasks have checkpointed their state. For the questions: # I think that in the case of a mix of bounded and unbounded sources, the change should target to allow Flink to have checkpoints and the subsequent checkpoint notifications even if some tasks are finished. This is not only if we have a mix of bounded and unbounded sources but even in cases of only bounded sources where some source tasks are faster than others, or sources finish but intermediate tasks are CPU bound and take longer to finish. Apart from that, there should also be a way to somehow mark in the checkpoint that some tasks were finished, so no need to restore them upon recovery or wait for them to checkpoint state. Of course this requires a lot more design but I think supporting close and dispose on the UDF level can be done independently. # For this I agree that more gracefull resource de-allocation may needed but I think that [~trohrmann] may have more insights. # For 3.1 I agree that the JobFinished should be received by all tasks and if not, then restart from the last successful checkpoint. For 3.2 this is why we go through the normal checkpoint/notifyCheckpointComplete mechanism. On checkpoint, we take a normal checkpoint so that if this fails we restart from the previous checkpoint which is consistent state, and only when the checkpoint is successful (upon notifyCheckpointCompleted), we actually "commit" the data. If this fails, or some of the tasks fail and some succeed, then we start from the latest checkpoint (the one we took just before the "commit"), which will include the latest changes, i.e. that the in-progress files were closed, or in the case of DRAIN, that the max-watermark was sent. > User functions should be able to differentiate between successful close and > erroneous close > ------------------------------------------------------------------------------------------- > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 0.10.0 > Reporter: Stephan Ewen > Assignee: Kostas Kloudas > Priority: Major > Labels: usability > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian Jira (v8.3.4#803005)