[ https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290197#comment-17290197 ]
Till Rohrmann edited comment on FLINK-21133 at 2/24/21, 6:59 PM: ----------------------------------------------------------------- For #2, we should fail the stop-with-savepoint operation if the {{notifyCheckpointComplete}} fails and resume the job from the last checkpoint. In this case it would actually be the just created savepoint. Whether we need to restart tasks or not depends on where the failure occurs. #3 and #4 should actually be quite similar in this regard. FLINK-21030 should fix that we restart in case of a failure after the savepoint has been successfully created. I think it is not important that we keep finished operators running for the final checkpoint. If some operators have finished before the overall job reaches the final checkpoint, then these operators can shut down with a previous checkpoint. We somehow just need to remember that these operators were already completed in case we need to restart the job if a failure happened in {{notifyCheckpointComplete}}. I am pretty certain that our current API is not sufficient for expressing what we want to express (e.g. not being able to take a checkpoint after we have flushed all records from an operator via {{StreamOperator.close}}). Breaking an API is never good but it should also not hinder us to think about how things should ideally look like and then think about how we could get there. Ideally we don't have to break things but if the current APIs do not allow to express the behaviour we need our operators to have, then at some point we have to take this bullet. In doubt I would prefer this compared to working around the current limitations and never being able to properly define the required operator semantics. was (Author: till.rohrmann): For #2, we should fail the stop-with-savepoint operation if the {{notifyCheckpointComplete}} fails and resume the job from the last checkpoint. In this case it would actually be the just created savepoint. Whether we need to restart tasks or not depends on where the failure occurs. #3 and #4 should actually be quite similar in this regard. FLINK-21030 should fix that we restart in case of a failure after the savepoint has been successfully created. I think it is not important that we keep finished operators running for the final checkpoint. If some operators have finished before the overall job reaches the final checkpoint, then these operators can shut down with a previous checkpoint. We somehow just need to remember that these operators were already completed in case we need to restart the job if a failure happened in {{notifyCheckpointComplete}}. I am pretty certain that our current API is not sufficient for expressing what we want to express (e.g. not being able to take a checkpoint after we have flushed all records from an operator via {{StreamOperator.close}}). Breaking an API is never good but it should also not hinder us to think about how things should ideally look like and then think about how we could get there. Ideally we don't have to break things but if the current APIs do not allow to express the behaviour we need our operators to have, then at some point we have to take this bullet. > FLIP-27 Source does not work with synchronous savepoint > ------------------------------------------------------- > > Key: FLINK-21133 > URL: https://issues.apache.org/jira/browse/FLINK-21133 > Project: Flink > Issue Type: Bug > Components: API / Core, API / DataStream, Runtime / Checkpointing > Affects Versions: 1.11.3, 1.12.1 > Reporter: Kezhu Wang > Priority: Critical > Fix For: 1.11.4, 1.13.0, 1.12.3 > > > I have pushed branch > [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] > in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} > failed due to timeout. > See also FLINK-21132 and > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033].. -- This message was sent by Atlassian Jira (v8.3.4#803005)