[ https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288240#comment-17288240 ]
Yun Gao edited comment on FLINK-21133 at 2/22/21, 8:20 AM: ----------------------------------------------------------- Hi [~sewen] very thanks for the deep insights! {quote}To support specifying a savepoint for that shutdown, we would need to be able to shut down the dataflow pipeline with one checkpoint in total. That would be interesting input for the FLIP-147 discussion {quote} I tried to think a bit on this issue: for stop with savepoint with pipeline draining, our main target is to get a consistent savepoint for all the tasks. Meanwhile, in considering of the case that the savepoint fails and then we need resume the job, it seems we would still need to first triggering a savepoint, and holding the tasks until the savepoint succeed, then the tasks get to the finish process (e.g., endOfInput & close). In this case we would need a savepoint _before_ the end of input for each operator. For shut down a bounded stream with a checkpoint, our main target would be for operators interact with external systems, we need to get the pending data committed _after_ processed all the records (e.g., endOfInput), which makes it a bit divergent from stopping with savepoint. On the other side, in this case it would not necessary for all the operators to commit their pending records in one checkpoint (but I totally agree with it would be good to reduce the number of checkpoints required). Due to this difference, it seems to me that it might be not easy to unify the two processes? If we only consider whether we could shuts down a bounded stream with a single checkpoint, since we could not broadcast barriers after emitting {{EndOfPartitionEvent}}, we might have to introduce a new {{EndOfInputEvent}} to notify the endOfInput separately, then take a unified checkpoint, and emit {{EndOfPartitionEvent }}finally. There might be still two problem here, one is whether we want to [support emitting records in notifyCheckpointComplete|https://lists.apache.org/thread.html/rc15b3f7f4ee3b94132f1da8f0b7e988607bccdc2eec49cf156061744%40%3Cdev.flink.apache.org%3E], another is that since we could not guarantee when the sources would finish, we may have to holding the finished tasks for a long time, and the extreme case would be for mixed jobs with both bounded and unbounded sources, in this case we would have to hold the finished tasks forever ? Thus I think even after we introduce {{{{EndOfInputEvent}}}} and not emit records in notifyCheckpointComplete, we might still not need to limit the tasks to wait for a single checkpoint. was (Author: gaoyunhaii): Hi [~sewen] very thanks for the deep insights! {quote}To support specifying a savepoint for that shutdown, we would need to be able to shut down the dataflow pipeline with one checkpoint in total. That would be interesting input for the FLIP-147 discussion {quote} I tried to think a bit on this issue: for stop with savepoint with pipeline draining, our main target is to get a consistent savepoint for all the tasks. Meanwhile, in considering of the case that the savepoint fails and then we need resume the job, it seems we would still need to first triggering a savepoint, and holding the tasks until the savepoint succeed, then the tasks get to the finish process (e.g., endOfInput & close). In this case we would need a savepoint _before_ the end of input for each operator. For shut down a bounded stream with a checkpoint, our main target would be for operators interact with external systems, we need to get the pending data committed _after_ processed all the records (e.g., endOfInput), which makes it a bit divergent from stopping with savepoint. On the other side, in this case it would not necessary for all the operators to commit their pending records in one checkpoint (but I totally agree with it would be good to reduce the number of checkpoints required). Due to this difference, it seems to me that it might be not easy to unify the two processes? If we only consider whether we could shuts down a bounded stream with a single checkpoint, since we could not broadcast barriers after emitting {{EndOfPartitionEvent}}, we might have to introduce a new {{EndOfInputEvent}} to notify the endOfInput separately, then take a unified checkpoint, and emit \{{EndOfPartitionEvent }}finally. There might be still two problem here, one is whether we want to [support emitting records in notifyCheckpointComplete|https://lists.apache.org/thread.html/rc15b3f7f4ee3b94132f1da8f0b7e988607bccdc2eec49cf156061744%40%3Cdev.flink.apache.org%3E], another is that since we could not guarantee when the sources would finish, we may have to holding the finished tasks for a long time, and the extreme case would be for mixed jobs with both bounded and unbounded sources, in this case we would have to hold the finished tasks forever ? Thus I think even after we introduce {{EndOfInputEvent}} and not emit records in notifyCheckpointComplete, we might still not need to limit the tasks to wait for a single checkpoint. > FLIP-27 Source does not work with synchronous savepoint > ------------------------------------------------------- > > Key: FLINK-21133 > URL: https://issues.apache.org/jira/browse/FLINK-21133 > Project: Flink > Issue Type: Bug > Components: API / Core, API / DataStream, Runtime / Checkpointing > Affects Versions: 1.11.3, 1.12.1 > Reporter: Kezhu Wang > Priority: Critical > Fix For: 1.11.4, 1.13.0, 1.12.3 > > > I have pushed branch > [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] > in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} > failed due to timeout. > See also FLINK-21132 and > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033].. -- This message was sent by Atlassian Jira (v8.3.4#803005)