[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17022873#comment-17022873 ]
Guowei Ma commented on FLINK-2646: ---------------------------------- Thanks for your detailed comments. I think it is very cool unification the sink for the bounded and unbounded job. What I understand about the Sinks and Committing as follow, correct me if I am wrong # EndOfInput(TaskFinish) is very similar to trigger a checkpoint. One difference is this checkpoint is not triggered by the _CheckpointCoordinator_. So maybe Flink could notify the UDF to snapshot state when receiving the _EndOfInput_ # JobFinish is very similar to the checkpoint complete. So maybe Flink could also notify the UDF the _notifyCheckpointComplete_ when a job is finished. So the sink does not need to assume that the input is bounded or unbounded. It only depends on the checkpoint mechanism to achieve exactly-once on its side. I have some little questions and thoughts. I want to be on the same page with you guys through thinking about these problems. # When does the Flink notify the task CheckpointComplete if a job has both bounded and unbounded source? Because the job could not finish the finished tasks of the job could not be notified of the _JobFinished_ An option is that Flink needs to support triggering the checkpoint for a job that has the finished tasks and notifying the completion of the checkpoint. # When a slot could be released for the other task to use? If I understand correctly all the resources(included managed memory) should be released in the _dispose_ stage in the new design. So a task could not release any resource even after the task reports it is finished to JM if it needs to be notified of the _JobFinish_ As far as I know the JM could release slot when all the tasks in it are finished. This might lead to inconsistency. I am not pretty sure there are some specific cases for this. But I think it might be some potential risks in theory. # Flink needs to guarantee that the JobFinish event is received by all the tasks. Flink could not receive the acknowledgment of the JobFinish event from the task. There could be two situations. (The drain might have the same claim in some situations.) ## JobFinshed request/response is lost. Retrying JobFinished notification might resolve this problem. ## The task failed when handling the JobFinished event. So Flink could not receive the acknowledge. Flink could use the normal Failover Strategy and restart the task with the state that is snapshotted at the moment of the _endOfInput_. This could trigger another round _endOfInput_ and _JobFinished_. But I think this only works for the source that supports the checkpoint. (JM failover when notifying the JobFinish Event to the task. The new JM should notify the JobFinshed evet to all the tasks.) > User functions should be able to differentiate between successful close and > erroneous close > ------------------------------------------------------------------------------------------- > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 0.10.0 > Reporter: Stephan Ewen > Assignee: Kostas Kloudas > Priority: Major > Labels: usability > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian Jira (v8.3.4#803005)