[ 
https://issues.apache.org/jira/browse/FLINK-32876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang reassigned FLINK-32876:
----------------------------------

    Assignee: Junrui Li

> ExecutionTimeBasedSlowTaskDetector treats unscheduled tasks as slow tasks and 
> causes speculative execution to fail.
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32876
>                 URL: https://issues.apache.org/jira/browse/FLINK-32876
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.18.0
>            Reporter: Junrui Li
>            Assignee: Junrui Li
>            Priority: Major
>             Fix For: 1.18.0
>
>
> When we enable speculative execution and configure job with the following 
> configuration:
> {code:java}
> execution.batch.speculative.enabled: true
> slow-task-detector.execution-time.baseline-ratio: 0.0
> slow-task-detector.execution-time.baseline-lower-bound: 0s{code}
> The ExecutionTimeBasedSlowTaskDetector will identify ExecutionJobVertex that 
> has not yet been scheduled as slow tasks and notify them to the 
> SpeculativeScheduler. However, the SpeculativeScheduler requires that the 
> corresponding ExecutionVertex has entered the scheduled state before 
> scheduling backup tasks. If this requirement is not met, it will result in 
> speculative execution failure.
> The exception stack trace is as follows:
> {code:java}
> java.lang.IllegalStateException: Execution vertex 
> b3f44e8b1dc132ff2a47f7955c75ef7d_0 does not have a recorded version  at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:215) 
> ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at 
> org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.getCurrentVersion(ExecutionVertexVersioner.java:71)
>  ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at 
> org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.lambda$getExecutionVertexVersions$1(ExecutionVertexVersioner.java:89)
>  ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
> ~[?:1.8.0_333]  at 
> java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1580) 
> ~[?:1.8.0_333]  at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
> ~[?:1.8.0_333]  at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> ~[?:1.8.0_333]  at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
> ~[?:1.8.0_333]  at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
> ~[?:1.8.0_333]  at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) 
> ~[?:1.8.0_333]  at 
> org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.getExecutionVertexVersions(ExecutionVertexVersioner.java:90)
>  ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at 
> org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler.notifySlowTasks(SpeculativeScheduler.java:377)
>  ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at 
> org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector.lambda$scheduleTask$1(ExecutionTimeBasedSlowTaskDetector.java:129)
>  ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[?:1.8.0_333]  at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> ~[?:1.8.0_333]  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
>  ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
>  ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
>  ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
>  ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
>  ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>  [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) 
> [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_333] 
>  at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) 
> [?:1.8.0_333]  at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) 
> [?:1.8.0_333]  at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) 
> [?:1.8.0_333] {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to