[ 
https://issues.apache.org/jira/browse/FLINK-28941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17600606#comment-17600606
 ] 

Xingbo Huang commented on FLINK-28941:
--------------------------------------

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=40713&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203

> Savepoint ignores MaxConcurrentCheckpoint limit in aligned checkpoint case
> --------------------------------------------------------------------------
>
>                 Key: FLINK-28941
>                 URL: https://issues.apache.org/jira/browse/FLINK-28941
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.16.0
>            Reporter: Yunfeng Zhou
>            Priority: Blocker
>              Labels: pull-request-available, test-stability
>             Fix For: 1.16.0
>
>
> When the unaligned checkpoint is disabled, savepoints would be set as 
> forced[1], which means they can ignore the maxConcurrentCheckpoint limit[2] 
> and lead to the situation that there are more than maxConcurrentCheckpoint 
> running simultaneously. 
> This behavior is incompatible with OperatorCoordinatorHolder, which requires 
> that there should be at most one pending checkpoint at a time. As a result, 
> exceptions, as follows, might be thrown[3].
> {code:java}
> java.lang.IllegalStateException: Cannot mark for checkpoint 9, already marked 
> for checkpoint 8
>       at 
> org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.markForCheckpoint(SubtaskGatewayImpl.java:185)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinatorInternal$6(OperatorCoordinatorHolder.java:328)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>       at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_292]
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:327)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:243)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>  ~[classes/:?]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [scala-library-2.12.7.jar:?]
>       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [scala-library-2.12.7.jar:?]
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [scala-library-2.12.7.jar:?]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [scala-library-2.12.7.jar:?]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [scala-library-2.12.7.jar:?]
>       at akka.actor.Actor.aroundReceive(Actor.scala:537) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.actor.ActorCell.invoke(ActorCell.scala:548) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
> [?:1.8.0_292]
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
> [?:1.8.0_292]
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
> [?:1.8.0_292]
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
> [?:1.8.0_292]
> {code}
> [1] 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L160-L164
> [2] 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L444-L449
> [3] 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39860&view=logs&j=219f6d90-20a2-5863-7c1b-c80377a1018f&t=20186858-1485-5059-c9c6-446952519524&s=ab6e269b-88b2-5ded-2544-4aa5b1124530



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to