[ 
https://issues.apache.org/jira/browse/FLINK-28941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17599349#comment-17599349
 ] 

Yun Gao edited comment on FLINK-28941 at 9/2/22 9:05 AM:
---------------------------------------------------------

[~hxb] I have a double check with [~yunfengzhou] offline, that the modified 
exactly-once mechanism of the operator coordinator now relied on that no 
concurrent checkpoints, which is different from the previous implementation 
that only requires that no concurrent checkpoints in the trigger period. 
However, currently we could not ensure no concurrent checkpoints, since 
maxConcurrentCheckpoints is an open option to the users, even if we could have 
different thoughts for the forced checkpoints, it could not solve the issue if 
users have set maxConcurrentCheckpoints explicitly. Thus I think we may need to 
try to remove the dependency on the assumption that there is no concurrent 
checkpoints. 


was (Author: gaoyunhaii):
[~hxb] I have a double check with [~yunfengzhou] offline, that the modified 
exactly-once mechanism of the operator coordinator now relied on that no 
concurrent checkpoints, which is different from the previous implementation 
that only requires that no concurrent checkpoints to trigger. However, 
currently we could not ensure no concurrent checkpoints, since 
maxConcurrentCheckpoints is an open option to the users, even if we could have 
different thoughts for the forced checkpoints, it could not solve the issue if 
users have set maxConcurrentCheckpoints explicitly. Thus I think we may need to 
try to remove the dependency on the assumption that there is no concurrent 
checkpoints. 

> Savepoint ignores MaxConcurrentCheckpoint limit in aligned checkpoint case
> --------------------------------------------------------------------------
>
>                 Key: FLINK-28941
>                 URL: https://issues.apache.org/jira/browse/FLINK-28941
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.16.0
>            Reporter: Yunfeng Zhou
>            Priority: Blocker
>              Labels: test-stability
>             Fix For: 1.16.0
>
>
> When the unaligned checkpoint is disabled, savepoints would be set as 
> forced[1], which means they can ignore the maxConcurrentCheckpoint limit[2] 
> and lead to the situation that there are more than maxConcurrentCheckpoint 
> running simultaneously. 
> This behavior is incompatible with OperatorCoordinatorHolder, which requires 
> that there should be at most one pending checkpoint at a time. As a result, 
> exceptions, as follows, might be thrown[3].
> {code:java}
> java.lang.IllegalStateException: Cannot mark for checkpoint 9, already marked 
> for checkpoint 8
>       at 
> org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.markForCheckpoint(SubtaskGatewayImpl.java:185)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinatorInternal$6(OperatorCoordinatorHolder.java:328)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>       at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_292]
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:327)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:243)
>  ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>  ~[classes/:?]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [scala-library-2.12.7.jar:?]
>       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [scala-library-2.12.7.jar:?]
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [scala-library-2.12.7.jar:?]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [scala-library-2.12.7.jar:?]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [scala-library-2.12.7.jar:?]
>       at akka.actor.Actor.aroundReceive(Actor.scala:537) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.actor.ActorCell.invoke(ActorCell.scala:548) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
> [akka-actor_2.12-2.6.15.jar:2.6.15]
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
> [?:1.8.0_292]
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
> [?:1.8.0_292]
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
> [?:1.8.0_292]
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
> [?:1.8.0_292]
> {code}
> [1] 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L160-L164
> [2] 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L444-L449
> [3] 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39860&view=logs&j=219f6d90-20a2-5863-7c1b-c80377a1018f&t=20186858-1485-5059-c9c6-446952519524&s=ab6e269b-88b2-5ded-2544-4aa5b1124530



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to