[ https://issues.apache.org/jira/browse/FLINK-28941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17600606#comment-17600606 ]
Xingbo Huang commented on FLINK-28941: -------------------------------------- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=40713&view=logs&j=aa18c3f6-13b8-5f58-86bb-c1cffb239496&t=502fb6c0-30a2-5e49-c5c2-a00fa3acb203 > Savepoint ignores MaxConcurrentCheckpoint limit in aligned checkpoint case > -------------------------------------------------------------------------- > > Key: FLINK-28941 > URL: https://issues.apache.org/jira/browse/FLINK-28941 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.16.0 > Reporter: Yunfeng Zhou > Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.16.0 > > > When the unaligned checkpoint is disabled, savepoints would be set as > forced[1], which means they can ignore the maxConcurrentCheckpoint limit[2] > and lead to the situation that there are more than maxConcurrentCheckpoint > running simultaneously. > This behavior is incompatible with OperatorCoordinatorHolder, which requires > that there should be at most one pending checkpoint at a time. As a result, > exceptions, as follows, might be thrown[3]. > {code:java} > java.lang.IllegalStateException: Cannot mark for checkpoint 9, already marked > for checkpoint 8 > at > org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.markForCheckpoint(SubtaskGatewayImpl.java:185) > ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinatorInternal$6(OperatorCoordinatorHolder.java:328) > ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] > at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_292] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:327) > ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:243) > ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) > ~[classes/:?] > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > ~[classes/:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) > ~[classes/:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) > ~[classes/:?] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) > ~[classes/:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) > ~[classes/:?] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > [scala-library-2.12.7.jar:?] > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > [scala-library-2.12.7.jar:?] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [scala-library-2.12.7.jar:?] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [scala-library-2.12.7.jar:?] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [scala-library-2.12.7.jar:?] > at akka.actor.Actor.aroundReceive(Actor.scala:537) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > [?:1.8.0_292] > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > [?:1.8.0_292] > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > [?:1.8.0_292] > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > [?:1.8.0_292] > {code} > [1] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L160-L164 > [2] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L444-L449 > [3] > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39860&view=logs&j=219f6d90-20a2-5863-7c1b-c80377a1018f&t=20186858-1485-5059-c9c6-446952519524&s=ab6e269b-88b2-5ded-2544-4aa5b1124530 -- This message was sent by Atlassian Jira (v8.20.10#820010)