[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290197#comment-17290197
 ] 

Till Rohrmann edited comment on FLINK-21133 at 2/24/21, 6:59 PM:
-----------------------------------------------------------------

For #2, we should fail the stop-with-savepoint operation if the 
{{notifyCheckpointComplete}} fails and resume the job from the last checkpoint. 
In this case it would actually be the just created savepoint. Whether we need 
to restart tasks or not depends on where the failure occurs. #3 and #4 should 
actually be quite similar in this regard. FLINK-21030 should fix that we 
restart in case of a failure after the savepoint has been successfully created.

I think it is not important that we keep finished operators running for the 
final checkpoint. If some operators have finished before the overall job 
reaches the final checkpoint, then these operators can shut down with a 
previous checkpoint. We somehow just need to remember that these operators were 
already completed in case we need to restart the job if a failure happened in 
{{notifyCheckpointComplete}}.

I am pretty certain that our current API is not sufficient for expressing what 
we want to express (e.g. not being able to take a checkpoint after we have 
flushed all records from an operator via {{StreamOperator.close}}). Breaking an 
API is never good but it should also not hinder us to think about how things 
should ideally look like and then think about how we could get there. Ideally 
we don't have to break things but if the current APIs do not allow to express 
the behaviour we need our operators to have, then at some point we have to take 
this bullet. In doubt I would prefer this compared to working around the 
current limitations and never being able to properly define the required 
operator semantics.




was (Author: till.rohrmann):
For #2, we should fail the stop-with-savepoint operation if the 
{{notifyCheckpointComplete}} fails and resume the job from the last checkpoint. 
In this case it would actually be the just created savepoint. Whether we need 
to restart tasks or not depends on where the failure occurs. #3 and #4 should 
actually be quite similar in this regard. FLINK-21030 should fix that we 
restart in case of a failure after the savepoint has been successfully created.

I think it is not important that we keep finished operators running for the 
final checkpoint. If some operators have finished before the overall job 
reaches the final checkpoint, then these operators can shut down with a 
previous checkpoint. We somehow just need to remember that these operators were 
already completed in case we need to restart the job if a failure happened in 
{{notifyCheckpointComplete}}.

I am pretty certain that our current API is not sufficient for expressing what 
we want to express (e.g. not being able to take a checkpoint after we have 
flushed all records from an operator via {{StreamOperator.close}}). Breaking an 
API is never good but it should also not hinder us to think about how things 
should ideally look like and then think about how we could get there. Ideally 
we don't have to break things but if the current APIs do not allow to express 
the behaviour we need our operators to have, then at some point we have to take 
this bullet.



> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to