[ https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283631#comment-17283631 ]
Stephan Ewen commented on FLINK-21133: -------------------------------------- I see, the remaining problem is that exiting the mailbox loop in case of the source ending should call {{finishInput()}}, while exiting from {{stop()}} should not. Digging through the code is that there seems to be a lot of confusion and mixup between cancelling, stopping, reaching end of the stream, reaching the end of the input. For example, the fact that we need to pass a flag {{"isStoppingBySyncSavepoint"}} to the {{"close()"}} method points to a bad abstraction there. I think it makes sense to revisit this and clean it up. Similar like [~kezhuw] suggested, having an explicit {{EndOfData}} event that would trigger {{finishInput()}} woudl make this cleaner. In that case - stop() would simply exit the mailbox loop, task closes/cleans up, and it send EndOfPartition downstream, which also ends exits the mailbox in the same way - reaching the end of a source, or using "stop()" with the additional flag to drain the pipeline would call "finishInput()" and enqueue an EndOfData event before exiting the loop, so that downstream tasks also fall "finishInput()". > FLIP-27 Source does not work with synchronous savepoint > ------------------------------------------------------- > > Key: FLINK-21133 > URL: https://issues.apache.org/jira/browse/FLINK-21133 > Project: Flink > Issue Type: Bug > Components: API / Core, API / DataStream, Runtime / Checkpointing > Affects Versions: 1.11.3, 1.12.1 > Reporter: Kezhu Wang > Priority: Critical > Fix For: 1.11.4, 1.12.2, 1.13.0 > > > I have pushed branch > [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] > in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} > failed due to timeout. > See also FLINK-21132 and > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033].. -- This message was sent by Atlassian Jira (v8.3.4#803005)