[
https://issues.apache.org/jira/browse/FLINK-28941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598755#comment-17598755
]
Yun Gao commented on FLINK-28941:
---------------------------------
Hi [~hxb] Sorry for the delay, I'm still confirming the issue, and I'll try to
have it fixed inside this week.
> Savepoint ignores MaxConcurrentCheckpoint limit in aligned checkpoint case
> --------------------------------------------------------------------------
>
> Key: FLINK-28941
> URL: https://issues.apache.org/jira/browse/FLINK-28941
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.16.0
> Reporter: Yunfeng Zhou
> Priority: Critical
> Labels: test-stability
> Fix For: 1.16.0
>
>
> When the unaligned checkpoint is disabled, savepoints would be set as
> forced[1], which means they can ignore the maxConcurrentCheckpoint limit[2]
> and lead to the situation that there are more than maxConcurrentCheckpoint
> running simultaneously.
> This behavior is incompatible with OperatorCoordinatorHolder, which requires
> that there should be at most one pending checkpoint at a time. As a result,
> exceptions, as follows, might be thrown[3].
> {code:java}
> java.lang.IllegalStateException: Cannot mark for checkpoint 9, already marked
> for checkpoint 8
> at
> org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.markForCheckpoint(SubtaskGatewayImpl.java:185)
> ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinatorInternal$6(OperatorCoordinatorHolder.java:328)
> ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
> at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_292]
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:327)
> ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:243)
> ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
> ~[classes/:?]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[classes/:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
> ~[classes/:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
> ~[classes/:?]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
> ~[classes/:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
> ~[classes/:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> [akka-actor_2.12-2.6.15.jar:2.6.15]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> [akka-actor_2.12-2.6.15.jar:2.6.15]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [scala-library-2.12.7.jar:?]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [scala-library-2.12.7.jar:?]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> [akka-actor_2.12-2.6.15.jar:2.6.15]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [scala-library-2.12.7.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [scala-library-2.12.7.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [scala-library-2.12.7.jar:?]
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> [akka-actor_2.12-2.6.15.jar:2.6.15]
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> [akka-actor_2.12-2.6.15.jar:2.6.15]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> [akka-actor_2.12-2.6.15.jar:2.6.15]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> [akka-actor_2.12-2.6.15.jar:2.6.15]
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> [akka-actor_2.12-2.6.15.jar:2.6.15]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> [akka-actor_2.12-2.6.15.jar:2.6.15]
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> [akka-actor_2.12-2.6.15.jar:2.6.15]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> [akka-actor_2.12-2.6.15.jar:2.6.15]
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> [?:1.8.0_292]
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> [?:1.8.0_292]
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> [?:1.8.0_292]
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> [?:1.8.0_292]
> {code}
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L160-L164
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L444-L449
> [3]
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39860&view=logs&j=219f6d90-20a2-5863-7c1b-c80377a1018f&t=20186858-1485-5059-c9c6-446952519524&s=ab6e269b-88b2-5ded-2544-4aa5b1124530
--
This message was sent by Atlassian Jira
(v8.20.10#820010)