[ https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17285319#comment-17285319 ]
Kezhu Wang edited comment on FLINK-21133 at 2/16/21, 4:21 PM: -------------------------------------------------------------- {quote}But this assumption doesn't hold with the legacy sources. Maybe it would also cause problems with iterations/cyclic graphs in the future {quote} Hi [~pnowojski], I commented about [legacy source|https://issues.apache.org/jira/browse/FLINK-21132?focusedCommentId=17276558&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17276558] in [FLINK-21132|https://issues.apache.org/jira/browse/FLINK-21132?focusedCommentId=17276558&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17276558]. Legacy source always requires some special treatment in implementation. I have not evaluate iterations/cyclic graphs. I have assumption/expectation in implementation that when one task hit "notifyCheckpointComplete" in "stop-with-savepoint" path, all tasks will hit that point sooner or later, all we have to decide is how to exit(cooperative or independent). Before "notifyCheckpointComplete", code-path for all approaches could be same. I am no trying to cling to one approach or even my approach, I wanted/want to finger out best approach by evaluating/comparing all emerging approaches. # {{EndOfPartition}}: It works but not that elegant, and requires extra works from sources(chained or not). # {{EndOfData}}: I have raised same concerns as you. # Stop task individually: downstream task could be terminated before upstream task. Could cause errors in network stack and potential deadlock. I will also evaluate FLINK-21086 for further information. Besides my proposed approach for stop-with-savepoint, I also want to bring another separated question to attentions: Whether {{StreamOperator.close}} should be called *only* "after all records have been added to the operators" ? Currently, stop-with-savepoint is an exception for {{StreamOperator.close}}. This may be also related to [~pnowojski]'s recently 2pc concerns on FLIP-147 dicussion as {{StreamOperator.close}} is just {{flush-after-all-input-consumed}} from javadoc after FLINK-2647. was (Author: kezhuw): {quote}But this assumption doesn't hold with the legacy sources. Maybe it would also cause problems with iterations/cyclic graphs in the future {quote} Hi [~pnowojski], I commented about legacy source in FLINK-21132. Legacy source always requires some special treatment in implementation. I have not evaluate iterations/cyclic graphs. I have assumption/expectation in implementation that when one task hit "notifyCheckpointComplete" in "stop-with-savepoint" path, all tasks will hit that point sooner or later, all we have to decide is how to exit(cooperative or independent). Before "notifyCheckpointComplete", code-path for all approaches could be same. I am no trying to cling to one approach or even my approach, I wanted/want to finger out best approach by evaluating/comparing all emerging approaches. # {{EndOfPartition}}: It works but not that elegant, and requires extra works from sources(chained or not). # {{EndOfData}}: I have raised same concerns as you. # Stop task individually: downstream task could be terminated before upstream task. Could cause errors in network stack and potential deadlock. I will also evaluate FLINK-21086 for further information. Besides my proposed approach for stop-with-savepoint, I also want to bring another separated question to attentions: Whether {{StreamOperator.close}} should be called *only* "after all records have been added to the operators" ? Currently, stop-with-savepoint is an exception for {{StreamOperator.close}}. This may be also related to [~pnowojski]'s recently 2pc concerns on FLIP-147 dicussion as {{StreamOperator.close}} is just {{flush-after-all-input-consumed}} from javadoc after FLINK-2647. > FLIP-27 Source does not work with synchronous savepoint > ------------------------------------------------------- > > Key: FLINK-21133 > URL: https://issues.apache.org/jira/browse/FLINK-21133 > Project: Flink > Issue Type: Bug > Components: API / Core, API / DataStream, Runtime / Checkpointing > Affects Versions: 1.11.3, 1.12.1 > Reporter: Kezhu Wang > Priority: Critical > Fix For: 1.11.4, 1.13.0, 1.12.3 > > > I have pushed branch > [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] > in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} > failed due to timeout. > See also FLINK-21132 and > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033].. -- This message was sent by Atlassian Jira (v8.3.4#803005)