[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024185#comment-17024185
 ] 

Kostas Kloudas commented on FLINK-2646:
---------------------------------------

Hi [~maguowei], one of the problems I see with closing and scheduling for 
commit the last in-progress files in the {{dispose()}} is that the 
{{dispose()}} is called after any other method (including the 
{{checkpoint()/notifyCheckpointComplete()}}) of a UDF or an operator. The 
reason for this is that its responsibility is to clean-up/free any lingering 
resources (e.g. connections to a DB).

Given this, in a {{stop-with-savepoint}} scenario we would have the following 
series of events:
- checkpoint = schedule for committing any closed part files and register the 
valid offset of the current in-progress file
- notifyCheckpointCompleted = commit the pending, closed part files
- dispose = close the last in-progress file

This would leave the last in-progress file closed but not committed.

Also, the same interface is going to be used for non-2PC sinks and normal UDFs. 
In this case, I think that hiding "clean shutdown - EOI" and "abrupt 
cancellation or failure" semantics behind the same function can lead to 
problems for some users and they will always have to go with the "pessimistic" 
approach of writing their {{dispose()}} for failures so that they can keep 
correctness. 

> User functions should be able to differentiate between successful close and 
> erroneous close
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-2646
>                 URL: https://issues.apache.org/jira/browse/FLINK-2646
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 0.10.0
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to