[ https://issues.apache.org/jira/browse/FLINK-28941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17599349#comment-17599349 ]
Yun Gao edited comment on FLINK-28941 at 9/2/22 9:05 AM: --------------------------------------------------------- [~hxb] I have a double check with [~yunfengzhou] offline, that the modified exactly-once mechanism of the operator coordinator now relied on that no concurrent checkpoints, which is different from the previous implementation that only requires that no concurrent checkpoints in the trigger period. However, currently we could not ensure no concurrent checkpoints, since maxConcurrentCheckpoints is an open option to the users, even if we could have different thoughts for the forced checkpoints, it could not solve the issue if users have set maxConcurrentCheckpoints explicitly. Thus I think we may need to try to remove the dependency on the assumption that there is no concurrent checkpoints. was (Author: gaoyunhaii): [~hxb] I have a double check with [~yunfengzhou] offline, that the modified exactly-once mechanism of the operator coordinator now relied on that no concurrent checkpoints, which is different from the previous implementation that only requires that no concurrent checkpoints to trigger. However, currently we could not ensure no concurrent checkpoints, since maxConcurrentCheckpoints is an open option to the users, even if we could have different thoughts for the forced checkpoints, it could not solve the issue if users have set maxConcurrentCheckpoints explicitly. Thus I think we may need to try to remove the dependency on the assumption that there is no concurrent checkpoints. > Savepoint ignores MaxConcurrentCheckpoint limit in aligned checkpoint case > -------------------------------------------------------------------------- > > Key: FLINK-28941 > URL: https://issues.apache.org/jira/browse/FLINK-28941 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.16.0 > Reporter: Yunfeng Zhou > Priority: Blocker > Labels: test-stability > Fix For: 1.16.0 > > > When the unaligned checkpoint is disabled, savepoints would be set as > forced[1], which means they can ignore the maxConcurrentCheckpoint limit[2] > and lead to the situation that there are more than maxConcurrentCheckpoint > running simultaneously. > This behavior is incompatible with OperatorCoordinatorHolder, which requires > that there should be at most one pending checkpoint at a time. As a result, > exceptions, as follows, might be thrown[3]. > {code:java} > java.lang.IllegalStateException: Cannot mark for checkpoint 9, already marked > for checkpoint 8 > at > org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.markForCheckpoint(SubtaskGatewayImpl.java:185) > ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinatorInternal$6(OperatorCoordinatorHolder.java:328) > ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] > at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_292] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:327) > ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:243) > ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) > ~[classes/:?] > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > ~[classes/:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) > ~[classes/:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) > ~[classes/:?] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) > ~[classes/:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) > ~[classes/:?] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > [scala-library-2.12.7.jar:?] > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > [scala-library-2.12.7.jar:?] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [scala-library-2.12.7.jar:?] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [scala-library-2.12.7.jar:?] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [scala-library-2.12.7.jar:?] > at akka.actor.Actor.aroundReceive(Actor.scala:537) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > [akka-actor_2.12-2.6.15.jar:2.6.15] > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > [?:1.8.0_292] > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > [?:1.8.0_292] > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > [?:1.8.0_292] > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > [?:1.8.0_292] > {code} > [1] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L160-L164 > [2] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L444-L449 > [3] > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39860&view=logs&j=219f6d90-20a2-5863-7c1b-c80377a1018f&t=20186858-1485-5059-c9c6-446952519524&s=ab6e269b-88b2-5ded-2544-4aa5b1124530 -- This message was sent by Atlassian Jira (v8.20.10#820010)