[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17024305#comment-17024305
 ] 

Guowei Ma commented on FLINK-2646:
----------------------------------

Hi, [~kkl0u]

>From Stephan and your reply, I have an assumption: Flink could always trigger 
>a checkpoint at the end of input and notified the checkpoint complete to all 
>the UDF if they need it in whatever situation.

So

1. For the stop-with-checkpoint scenario, we could choose another 
implementation that commits the part of the last in-progress file in the 
dispose stage.

2. For the non-2PC and normal UDFs, they could implement the 
CheckpointedFunction interface.

 Generally, I think the EOI interface is a very clear definition. However, I 
just want to know Flink introduces the EOI interface to the DataStream is for 
clarifying the concept or there are some scenarios that could not be satisfied 
with the current interfaces if the assumption is true.

 

> User functions should be able to differentiate between successful close and 
> erroneous close
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-2646
>                 URL: https://issues.apache.org/jira/browse/FLINK-2646
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 0.10.0
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to