[ https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288340#comment-17288340 ]
Till Rohrmann edited comment on FLINK-21133 at 2/22/21, 12:15 PM: ------------------------------------------------------------------ Thanks for driving this discussion. I think a lot of things are coming together here because shutdown/closing semantics of operators are tightly coupled to how sources and sinks must work and it should ideally work for bounded/unbounded streams and batch/streaming execution mode similarly. I think it is a good idea to think about what functionality we want to offer to our users as [~becket_qin] did it before diving into the concrete implementation details: 1. *A user wants to cancel a bounded/unbounded job and does not care about the output correctness* This is what we currently support with {{Task.cancelExecution}}. Cancel should forcefully terminate the operators and we don't care whether records are properly sent to downstream tasks or written to a sink. 2. *A user wants to suspend a bounded/unbounded job and wants to be able to resume it later* This is what we currently call **stop-with-savepoint**. Here the user wants to create a savepoint representing the current state of the computation which he can use to resume the job at a later point. The user does not care about whether the job shuts down orderly or not. Hence, we should be able to simply cancel all tasks after taking the savepoint. 3. *A user wants to terminate a bounded/unbounded job and wants all of its buffered data to be flushed to external systems* This is what we currently call *stop-with-savepoint --drain*. Here the user is not so much interested in the savepoint as in materializing the job's state/result to external systems. The only use case for the created savepoint here I could think of is to get access to Flink's state after the job has terminated. Resuming the job from such a savepoint does not make much sense because the results will be affected by the {{MAX_WATERMARK}} we have sent. Maybe **stop-with-savepoint --drain** is a misnomer. 4. *A bounded job reaches its end and in order to guarantee correctness needs to flush als its buffered data* This effectively the same as terminating a bounded/unbounded job just that it is not induced by the user but by the sources reaching the end of data. 5. *A user wants to gracefully stop its job w/o creating a savepoint* The only case I can think of is that a user wants to stop an at-least-once job in such a way that all records up to the point of issuing the command will get processed. Other than that cancel might already be good enough. Conceptually, 3. and 4. should use the same mechanism because semantically, there is no difference [~gaoyunhaii]. I agree with [~kezhuw] and [~sewen] that 2. should not require the complexity of shutting the job gracefully down. E.g. by allowing the state transition {{RUNNING -> CANCELLED}} on the {{JobMaster}} could allow the {{Tasks}} to simply cancel themselves after receiving {{notifyCheckpointComplete}}. Alternatively, {{Tasks}} could acknowledge the {{notifyCheckpointComplete}} but this adds complexity to the already complex {{CheckpointCoordinator}}. was (Author: till.rohrmann): Thanks for driving this discussion. I think a lot of things are coming together here because shutdown/closing semantics of operators are tightly coupled to how sources and sinks must work and it should ideally work for bounded and unbounded streams similarly. I think it is a good idea to think about what functionality we want to offer to our users as [~becket_qin] did it before diving into the concrete implementation details: 1. *A user wants to cancel a bounded/unbounded job and does not care about the output correctness* This is what we currently support with {{Task.cancelExecution}}. Cancel should forcefully terminate the operators and we don't care whether records are properly sent to downstream tasks or written to a sink. 2. *A user wants to suspend a bounded/unbounded job and wants to be able to resume it later* This is what we currently call **stop-with-savepoint**. Here the user wants to create a savepoint representing the current state of the computation which he can use to resume the job at a later point. The user does not care about whether the job shuts down orderly or not. Hence, we should be able to simply cancel all tasks after taking the savepoint. 3. *A user wants to terminate a bounded/unbounded job and wants all of its buffered data to be flushed to external systems* This is what we currently call *stop-with-savepoint --drain*. Here the user is not so much interested in the savepoint as in materializing the job's state/result to external systems. The only use case for the created savepoint here I could think of is to get access to Flink's state after the job has terminated. Resuming the job from such a savepoint does not make much sense because the results will be affected by the {{MAX_WATERMARK}} we have sent. Maybe **stop-with-savepoint --drain** is a misnomer. 4. *A bounded job reaches its end and in order to guarantee correctness needs to flush als its buffered data* This effectively the same as terminating a bounded/unbounded job just that it is not induced by the user but by the sources reaching the end of data. 5. *A user wants to gracefully stop its job w/o creating a savepoint* The only case I can think of is that a user wants to stop an at-least-once job in such a way that all records up to the point of issuing the command will get processed. Other than that cancel might already be good enough. Conceptually, 3. and 4. should use the same mechanism because semantically, there is no difference [~gaoyunhaii]. I agree with [~kezhuw] and [~sewen] that 2. should not require the complexity of shutting the job gracefully down. E.g. by allowing the state transition {{RUNNING -> CANCELLED}} on the {{JobMaster}} could allow the {{Tasks}} to simply cancel themselves after receiving {{notifyCheckpointComplete}}. Alternatively, {{Tasks}} could acknowledge the {{notifyCheckpointComplete}} but this adds complexity to the already complex {{CheckpointCoordinator}}. > FLIP-27 Source does not work with synchronous savepoint > ------------------------------------------------------- > > Key: FLINK-21133 > URL: https://issues.apache.org/jira/browse/FLINK-21133 > Project: Flink > Issue Type: Bug > Components: API / Core, API / DataStream, Runtime / Checkpointing > Affects Versions: 1.11.3, 1.12.1 > Reporter: Kezhu Wang > Priority: Critical > Fix For: 1.11.4, 1.13.0, 1.12.3 > > > I have pushed branch > [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] > in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} > failed due to timeout. > See also FLINK-21132 and > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033].. -- This message was sent by Atlassian Jira (v8.3.4#803005)