Hi Steffen,

This exception indicates that when the downstream task requests partition from 
the upstream task, the upstream task has not initialized to register its result 
partition.
In this case, the downstream task will inquire the state from job manager, and 
then retry to request partition from the upstream until the maximum retry 
timeout. You can increase the parameter of 
"taskmanager.network.request-backoff.max" to check whether it works, the 
default value is 10s.

BTW, you should check why the upstream registers its result partition delayed, 
maybe the upstream TaskManager received the task deployment delayed from 
JobManager, or some operations in upstream task initialization unexpectly cost 
more time before registering result partition. 

Best,
Zhijiang
------------------------------------------------------------------
发件人:Steffen Wohlers <steffenwohl...@gmx.de>
发送时间:2018年7月22日(星期日) 22:22
收件人:user <user@flink.apache.org>
主 题:Network PartitionNotFoundException when run on multi nodes

Hi all,

I have some problems when running my application on more than one Task Manager.

setup:
node1: Job Manager, Task Manager
node2: Task Manager

I can run my program successfully on each node alone when I stop the other Task 
Manager.
But when I start both and set parallelism = 2, every time I got the following 
exception (after 30 seconds):

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 6372b5f434d55e987ea179d6f6b488fe@e389ca50a2c2cf776b90268f987a6546 not 
found.
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:273)
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:182)
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:400)
        at 
org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1294)
        at 
org.apache.flink.runtime.taskmanager.Task.lambda$triggerPartitionProducerStateCheck$0(Task.java:1151)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



It seems the problem occurs when a subtask is linked to both Task Manager.

Does anybody know how I can make it work?

Thanks,
Steffen


Reply via email to