[ https://issues.apache.org/jira/browse/FLINK-21132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273052#comment-17273052 ]
Kezhu Wang commented on FLINK-21132: ------------------------------------ I did not find an easy/viable way to distinguish truly end of input from stop-with-savepoint style end of input. They are both {{EndOfPartitionEvent}} in network. Legacy sources could also emit elements between synchronous savepoint and {{EndOfPartitionEvent}}. {quote}that is when the source and all the predecessor operators have shut down. {quote} How this could be viable for, say, a no chained head operator ? AFAIK, no chained operator has only two inputs from outside: rpc call through {{TaskExecutorGateway}} and stream elements and events from predecessor operators through input channel. I saw only {{EndOfPartitionEvent}} related. I think stop-with-savepoint has enough room to operate on *all tasks* not just *source tasks* without interfering with data flow upon {{StreamTask.notifyCheckpointComplete}}. I could give an simple(probably not ideal) solution, let {{Task}} query {{AbstractInvokable}} to know whether it should *finish* partition writers after successful run. This way FLINK-21133 is solved also. > BoundedOneInput.endInput is called when taking synchronous savepoint > -------------------------------------------------------------------- > > Key: FLINK-21132 > URL: https://issues.apache.org/jira/browse/FLINK-21132 > Project: Flink > Issue Type: Bug > Components: Runtime / Task > Affects Versions: 1.10.2, 1.10.3, 1.11.3, 1.12.1 > Reporter: Kezhu Wang > Priority: Major > > [~elkhand](?) reported on project iceberg that {{BoundedOneInput.endInput}} > was > [called|https://github.com/apache/iceberg/issues/2033#issuecomment-765864038] > when [stopping job with > savepoint|https://github.com/apache/iceberg/issues/2033#issuecomment-765557995]. > I think it is a bug of Flink and was introduced in FLINK-14230. The > [changes|https://github.com/apache/flink/pull/9854/files#diff-0c5fe245445b932fa83fdaf5c4802dbe671b73e8d76f39058c6eaaaffd9639faL577] > rely on {{StreamTask.afterInvoke}} and {{OperatorChain.closeOperators}} will > only be invoked after *end of input*. But that is not true long before after > [FLIP-34: Terminate/Suspend Job with > Savepoint|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212]. > Task could enter state called > [*finished*|https://github.com/apache/flink/blob/3a8e06cd16480eacbbf0c10f36b8c79a6f741814/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L467] > after synchronous savepoint, that is an expected job suspension and stopping. > [~sunhaibotb] [~pnowojski] [~roman_khachatryan] Could you help confirm this ? > For full context, see > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]. I have > pushed branch > [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] > in my repository. Test case > {{SavepointITCase.testStopSavepointWithBoundedInput}} failed due to > {{BoundedOneInput.endInput}} called. > I am also aware of [FLIP-147: Support Checkpoints After Tasks > Finished|https://cwiki.apache.org/confluence/x/mw-ZCQ], maybe the three > should align on what *finished* means exactly. [~kkl0u] [~chesnay] > [~gaoyunhaii] -- This message was sent by Atlassian Jira (v8.3.4#803005)