[ 
https://issues.apache.org/jira/browse/FLINK-24161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411170#comment-17411170
 ] 

Yun Gao commented on FLINK-24161:
---------------------------------

Perhaps for 1.14 we could first disable stop-with-savepoint without drain if 
the tasks are finishing ? We could fail the job if checkpoint after tasks 
finished is enabled and:
 # In {{endData()}} if there is an pending savepoint without drain, we fail the 
task by throw an exception.
 # When triggering stop-with-savepoint without drain, if {{endOfDataReceived = 
true}} , we then return false to reject the savepoint.

Both of the above 2 cases cause a failover. 

We could then provide the support for this case in 1.15 after we unified the 
process of savepoint~

> Can not stop the job with savepoint while a task is finishing
> -------------------------------------------------------------
>
>                 Key: FLINK-24161
>                 URL: https://issues.apache.org/jira/browse/FLINK-24161
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.0
>            Reporter: Yangze Guo
>            Priority: Critical
>             Fix For: 1.14.0
>
>         Attachments: 
> flink-yangze-standalonesession-0-IT-C02YV0L8LVDL.local.log, 
> flink-yangze-taskexecutor-0-IT-C02YV0L8LVDL.local.log, 
> flink-yangze-taskexecutor-1-IT-C02YV0L8LVDL.local.log
>
>
> When stop the job with savepoint, if there is a task is finishing, the action 
> will be timeout.
> Testing job: 
> https://github.com/KarmaGYZ/flink/blob/test-147/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
> Flink conf:
> {code:bash}
> state.savepoints.dir: file:///tmp/flink-savepoints
> state.backend: rocksdb
> state.backend.incremental: true
> state.checkpoints.dir: file:///tmp/flink-ckp/
> execution.checkpointing.aligned-checkpoint-timeout: 30 s
> execution.checkpointing.interval: 5 s
> taskmanager.numberOfTaskSlots: 2
> execution.checkpointing.checkpoints-after-tasks-finish.enabled: true
> {code}
> How to reproduce:
> {code:bash}
> bin/flink run -d -p 4 examples/streaming/WordCount.jar
> # while one task is finishing
> bin/flink stop $JOB_ID
> {code}
> Client log:
> {code:bash}
> ------------------------------------------------------------
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job 
> "e139a2eba7f8dc0b07fab65e84421ee4".
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:581)
>   at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
>   at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:569)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1069)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>   at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.util.concurrent.TimeoutException
>   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>   at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:579)
>   ... 6 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to