[ https://issues.apache.org/jira/browse/FLINK-2646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021848#comment-17021848 ]
Stephan Ewen commented on FLINK-2646: ------------------------------------- I would also like to understand this a bit better. The design doc hints at different interfaces and function calls for different scenarios. It sounds a bit hard to understand for users, I wonder if there is an easier way to do this. Guowei's argument has a good point, mainly differentiating between "finalizing work / flushing data" and "cleanup". Would a simple addition of one method "endOfStream()" be enough, and re-interpret the current "close()" to a "dispose()"? *Closing Calls* Cancellation: - dispose() -> cleanup Suspend: - endOfInput() -> finalize in-progress data (like prepare in-progress files for roll-over) - checkpoint() / notifyComplete() -> safely commit side effects - dispose() -> cleanup End-of-bounded-stream / Drain: - max_watermark -> end of time, drains all internal state - endOfInput() -> finalize in-progress data - checkpoint() / notifyComplete() -> safely commit side effects - dispose() -> cleanup The nice thing here is that everything builds on top of each other, adding a characteristic. * All cleanup is always in dispose() * "Drain" adds just one characteristic over "Suspend", and that is advancing the time. *Sinks and Committing* In a unified batch/streaming sink interface, we would need to have the notion of checkpoints anyways. A batch sink would simulate a checkpoint when finished * Task finishes = checkpoint * job finishes = notify complete That way, the streaming file sink for example automatically is also a batch file sink that publishes files atomically when the job completes. > User functions should be able to differentiate between successful close and > erroneous close > ------------------------------------------------------------------------------------------- > > Key: FLINK-2646 > URL: https://issues.apache.org/jira/browse/FLINK-2646 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 0.10.0 > Reporter: Stephan Ewen > Assignee: Kostas Kloudas > Priority: Major > Labels: usability > > Right now, the {{close()}} method of rich functions is invoked in case of > proper completion, and in case of canceling in case of error (to allow for > cleanup). > In certain cases, the user function needs to know why it is closed, whether > the task completed in a regular fashion, or was canceled/failed. > I suggest to add a method {{closeAfterFailure()}} to the {{RichFunction}}. By > default, this method calls {{close()}}. The runtime is the changed to call > {{close()}} as part of the regular execution and {{closeAfterFailure()}} in > case of an irregular exit. > Because by default all cases call {{close()}} the change would not be API > breaking. -- This message was sent by Atlassian Jira (v8.3.4#803005)