[ 
https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021848#comment-17021848
 ] 

Stephan Ewen commented on FLINK-2646:
-------------------------------------

I would also like to understand this a bit better. The design doc hints at 
different interfaces and function calls for different scenarios. It sounds a 
bit hard to understand for users, I wonder if there is an easier way to do 
this. Guowei's argument has a good point, mainly differentiating between 
"finalizing work / flushing data" and "cleanup".

Would a simple addition of one method "endOfStream()" be enough, and 
re-interpret the current "close()" to a "dispose()"?

*Closing Calls*
 
Cancellation:
  - dispose()  -> cleanup

Suspend:
  - endOfInput() -> finalize in-progress data (like prepare in-progress files 
for roll-over)
  - checkpoint() / notifyComplete() -> safely commit side effects
  - dispose() -> cleanup

End-of-bounded-stream / Drain:
  - max_watermark -> end of time, drains all internal state
  - endOfInput() -> finalize in-progress data 
  - checkpoint() / notifyComplete() -> safely commit side effects
  - dispose() -> cleanup

The nice thing here is that everything builds on top of each other, adding a 
characteristic.
* All cleanup is always in dispose()
* "Drain" adds just one characteristic over "Suspend", and that is advancing 
the time.

*Sinks and Committing*

In a unified batch/streaming sink interface, we would need to have the notion 
of checkpoints anyways.
A batch sink would simulate a checkpoint when finished
* Task finishes = checkpoint
* job finishes = notify complete

That way, the streaming file sink for example automatically is also a batch 
file sink that publishes files atomically when the job completes.


> User functions should be able to differentiate between successful close and 
> erroneous close
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-2646
>                 URL: https://issues.apache.org/jira/browse/FLINK-2646
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 0.10.0
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: usability
>
> Right now, the {{close()}} method of rich functions is invoked in case of 
> proper completion, and in case of canceling in case of error (to allow for 
> cleanup).
> In certain cases, the user function needs to know why it is closed, whether 
> the task completed in a regular fashion, or was canceled/failed.
> I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By 
> default, this method calls {{close()}}. The runtime is the changed to call 
> {{close()}} as part of the regular execution and {{closeAfterFailure()}} in 
> case of an irregular exit.
> Because by default all cases call {{close()}} the change would not be API 
> breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to