[ https://issues.apache.org/jira/browse/FLINK-21707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298666#comment-17298666 ]
Matthias commented on FLINK-21707: ---------------------------------- Thanks for bringing this up, [~zhuzh]. This is related to how we handled the stop-with-savepoint issue in FLINK-21030, isn't it? I managed to reproduce the hanging job locally with the patch [~zhuzh] suggest. We run into the following exception {code:java} java.lang.reflect.InvocationTargetException: null at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_265] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_265] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_265] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[classes/:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [scala-library-2.11.12.jar:?] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [scala-library-2.11.12.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [akka-actor_2.11-2.5.21.jar:2.5.21] Caused by: java.lang.IllegalStateException: BUG: trying to schedule a region which is not in CREATED state at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[classes/:?] at org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy.maybeScheduleRegion(PipelinedRegionSchedulingStrategy.java:162) ~[classes/:?] at org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy.maybeScheduleRegions(PipelinedRegionSchedulingStrategy.java:153) ~[classes/:?] at org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy.onExecutionStateChange(PipelinedRegionSchedulingStrategy.java:141) ~[classes/:?] at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:201) ~[classes/:?] at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:699) ~[classes/:?] at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80) ~[classes/:?] at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:433) ~[classes/:?] ... 26 more {code} > Job is possible to hang when restarting a FINISHED task with POINTWISE > BLOCKING consumers > ----------------------------------------------------------------------------------------- > > Key: FLINK-21707 > URL: https://issues.apache.org/jira/browse/FLINK-21707 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.11.3, 1.12.2, 1.13.0 > Reporter: Zhu Zhu > Priority: Blocker > > Job is possible to hang when restarting a FINISHED task with POINTWISE > BLOCKING consumers. This is because > {{PipelinedRegionSchedulingStrategy#onExecutionStateChange()}} will try to > schedule all the consumer tasks/regions of the finished *ExecutionJobVertex*, > even though the regions are not the exact consumers of the finished > *ExecutionVertex*. In this case, some of the regions can be in non-CREATED > state because they are not connected to nor affected by the restarted tasks. > However, {{PipelinedRegionSchedulingStrategy#maybeScheduleRegion()}} does not > allow to schedule a non-CREATED region and will throw an Exception and breaks > the scheduling of all the other regions. One example to show this problem > case can be found at > [PipelinedRegionSchedulingITCase#testRecoverFromPartitionException > |https://github.com/zhuzhurk/flink/commit/1eb036b6566c5cb4958d9957ba84dc78ce62a08c]. > To fix the problem, we can add a filter in > {{PipelinedRegionSchedulingStrategy#onExecutionStateChange()}} to only > trigger the scheduling of regions in CREATED state. -- This message was sent by Atlassian Jira (v8.3.4#803005)