[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283631#comment-17283631
 ] 

Stephan Ewen commented on FLINK-21133:
--------------------------------------

I see, the remaining problem is that exiting the mailbox loop in case of the 
source ending should call {{finishInput()}}, while exiting from {{stop()}} 
should not.

Digging through the code is that there seems to be a lot of confusion and mixup 
between cancelling, stopping, reaching end of the stream, reaching the end of 
the input. For example, the fact that we need to pass a flag 
{{"isStoppingBySyncSavepoint"}} to the {{"close()"}} method points to a bad 
abstraction there.

I think it makes sense to revisit this and clean it up. Similar like [~kezhuw] 
suggested, having an explicit {{EndOfData}} event that would trigger 
{{finishInput()}} woudl make this cleaner.

In that case
  - stop() would simply exit the mailbox loop, task closes/cleans up, and it 
send EndOfPartition downstream, which also ends exits the mailbox in the same 
way
  - reaching the end of a source, or using "stop()" with the additional flag to 
drain the pipeline would call "finishInput()" and enqueue an EndOfData event 
before exiting the loop, so that downstream tasks also fall "finishInput()".

> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.12.2, 1.13.0
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to