[ 
https://issues.apache.org/jira/browse/FLINK-21707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298666#comment-17298666
 ] 

Matthias commented on FLINK-21707:
----------------------------------

Thanks for bringing this up, [~zhuzh]. This is related to how we handled the 
stop-with-savepoint issue in FLINK-21030, isn't it?

I managed to reproduce the hanging job locally with the patch [~zhuzh] suggest. 
We run into the following exception
{code:java}
java.lang.reflect.InvocationTargetException: null
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_265]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_265]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_265]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
 ~[classes/:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 ~[classes/:?]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[classes/:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[classes/:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[scala-library-2.11.12.jar:?]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[scala-library-2.11.12.jar:?]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[akka-actor_2.11-2.5.21.jar:2.5.21]
Caused by: java.lang.IllegalStateException: BUG: trying to schedule a region 
which is not in CREATED state
        at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
~[classes/:?]
        at 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy.maybeScheduleRegion(PipelinedRegionSchedulingStrategy.java:162)
 ~[classes/:?]
        at 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy.maybeScheduleRegions(PipelinedRegionSchedulingStrategy.java:153)
 ~[classes/:?]
        at 
org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy.onExecutionStateChange(PipelinedRegionSchedulingStrategy.java:141)
 ~[classes/:?]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:201)
 ~[classes/:?]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:699)
 ~[classes/:?]
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
 ~[classes/:?]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:433)
 ~[classes/:?]
        ... 26 more
{code}

> Job is possible to hang when restarting a FINISHED task with POINTWISE 
> BLOCKING consumers
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-21707
>                 URL: https://issues.apache.org/jira/browse/FLINK-21707
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.11.3, 1.12.2, 1.13.0
>            Reporter: Zhu Zhu
>            Priority: Blocker
>
> Job is possible to hang when restarting a FINISHED task with POINTWISE 
> BLOCKING consumers. This is because 
> {{PipelinedRegionSchedulingStrategy#onExecutionStateChange()}} will try to 
> schedule all the consumer tasks/regions of the finished *ExecutionJobVertex*, 
> even though the regions are not the exact consumers of the finished 
> *ExecutionVertex*. In this case, some of the regions can be in non-CREATED 
> state because they are not connected to nor affected by the restarted tasks. 
> However, {{PipelinedRegionSchedulingStrategy#maybeScheduleRegion()}} does not 
> allow to schedule a non-CREATED region and will throw an Exception and breaks 
> the scheduling of all the other regions. One example to show this problem 
> case can be found at 
> [PipelinedRegionSchedulingITCase#testRecoverFromPartitionException 
> |https://github.com/zhuzhurk/flink/commit/1eb036b6566c5cb4958d9957ba84dc78ce62a08c].
> To fix the problem, we can add a filter in 
> {{PipelinedRegionSchedulingStrategy#onExecutionStateChange()}} to only 
> trigger the scheduling of regions in CREATED state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to