[ https://issues.apache.org/jira/browse/FLINK-32876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lijie Wang reassigned FLINK-32876: ---------------------------------- Assignee: Junrui Li > ExecutionTimeBasedSlowTaskDetector treats unscheduled tasks as slow tasks and > causes speculative execution to fail. > ------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-32876 > URL: https://issues.apache.org/jira/browse/FLINK-32876 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.18.0 > Reporter: Junrui Li > Assignee: Junrui Li > Priority: Major > Fix For: 1.18.0 > > > When we enable speculative execution and configure job with the following > configuration: > {code:java} > execution.batch.speculative.enabled: true > slow-task-detector.execution-time.baseline-ratio: 0.0 > slow-task-detector.execution-time.baseline-lower-bound: 0s{code} > The ExecutionTimeBasedSlowTaskDetector will identify ExecutionJobVertex that > has not yet been scheduled as slow tasks and notify them to the > SpeculativeScheduler. However, the SpeculativeScheduler requires that the > corresponding ExecutionVertex has entered the scheduled state before > scheduling backup tasks. If this requirement is not met, it will result in > speculative execution failure. > The exception stack trace is as follows: > {code:java} > java.lang.IllegalStateException: Execution vertex > b3f44e8b1dc132ff2a47f7955c75ef7d_0 does not have a recorded version at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:215) > ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at > org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.getCurrentVersion(ExecutionVertexVersioner.java:71) > ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at > org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.lambda$getExecutionVertexVersions$1(ExecutionVertexVersioner.java:89) > ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > ~[?:1.8.0_333] at > java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1580) > ~[?:1.8.0_333] at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > ~[?:1.8.0_333] at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > ~[?:1.8.0_333] at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > ~[?:1.8.0_333] at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > ~[?:1.8.0_333] at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > ~[?:1.8.0_333] at > org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.getExecutionVertexVersions(ExecutionVertexVersioner.java:90) > ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at > org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler.notifySlowTasks(SpeculativeScheduler.java:377) > ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at > org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector.lambda$scheduleTask$1(ExecutionTimeBasedSlowTaskDetector.java:129) > ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[?:1.8.0_333] at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ~[?:1.8.0_333] at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) > ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) > ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) > ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) > ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) > ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) > [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT] at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_333] > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) > [?:1.8.0_333] at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) > [?:1.8.0_333] at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) > [?:1.8.0_333] {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)