Hey Niels, any update on this?
– Ufuk On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <u...@apache.org> wrote: > Hey Niels, > > thanks for the detailed report. I don't think that it is related to > the Hadoop or Scala version. I think the following happens: > > - Occasionally, one of your tasks seems to be extremely slow in > registering its produced intermediate result (the data shuffled > between TaskManagers) > - Another task is already requesting to consume data from this task > but cannot find it (after multiple retries) and it fails the complete > job (your stack trace) > > That happens only occasionally probably due to load in your cluster. > The slow down could have multiple reasons... > - Is your Hadoop cluster resource constrained and the tasks are slow to > deploy? > - Is your application JAR very large and needs a lot of time downloading? > > We have two options at this point: > 1) You can increase the maximum retries via the config option: > "taskmanager.network.request-backoff.max" The default is 10000 > (milliseconds) and specifies what the maximum request back off is [1]. > Increasing this to 30000 would give you two extra retries with pretty > long delays (see [1]). > > 2) To be sure that this is really what is happening we could increase > the log level of certain classes and check whether they have > registered their results or not. If you want to do this, I'm more than > happy to provide you with some classes to enable DEBUG logging for. > > What do you think? > > – Ufuk > > DETAILS > ======= > > - The TaskManagers produce and consume intermediate results > - When a TaskManager wants to consume a result, it directly queries > the producing TaskManager for it > - An intermediate result becomes ready for consumption during initial > task setup (state DEPLOYING) > - When a TaskManager is slow to register its intermediate result and > the consumer requests the result before it is ready, it can happen > that a requested partition is "not found" > > This is what is also happening here. We retry to request the > intermediate result multiple times with timed backoff [1] and only > fail the request (your stack trace) if the partition is still not > ready although we expect it to be ready (that is there was no failure > at the producing task). > > [1] Starting by default at 100 millis and going up to 10_000 millis by > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000) > > > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <ni...@basjes.nl> wrote: >> Hi, >> >> I'm having some trouble running a java based Flink job in a yarn-session. >> >> The job itself consists of reading a set of files resulting in a DataStream >> (I use DataStream because in the future I intend to change the file with a >> Kafka feed), then does some parsing and eventually writes the data into >> HBase. >> >> Most of the time running this works fine yet sometimes it fails with this >> exception: >> >> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: >> Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363 >> not found. >> at >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203) >> at >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128) >> at >> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345) >> at >> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286) >> at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123) >> at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118) >> at >> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) >> at akka.dispatch.OnComplete.internal(Future.scala:248) >> at akka.dispatch.OnComplete.internal(Future.scala:245) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >> at >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> at >> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >> at >> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> I went through all logs at the Hadoop side of all the related containers and >> other than this exception I did not see any warning/error that might explain >> what is going on here. >> >> Now the "Most of the time running this works fine" makes this hard to >> troubleshoot. When I run the same job again it may run perfectly that time. >> >> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked my >> pom.xml and I use the same version for Flink / Scala in there. >> >> The command used to start the yarn-session on my experimental cluster (no >> security, no other users): >> >> /usr/local/flink-1.3.2/bin/yarn-session.sh \ >> --container 180 \ >> --name "Flink on Yarn Experiments" \ >> --slots 1 \ >> --jobManagerMemory 4000 \ >> --taskManagerMemory 4000 \ >> --streaming \ >> --detached >> >> Two relevant fragments from my application pom.xml: >> >> <flink.version>1.3.2</flink.version> >> <flink.scala.version>2.11</flink.scala.version> >> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-java</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-java_${flink.scala.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients_${flink.scala.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-hbase_${flink.scala.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> I could really use some suggestions where to look for the root cause of >> this. >> Is this something in my application? My Hadoop cluster? Or is this a problem >> in Flink 1.3.2? >> >> Thanks. >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes