Hey Niels,

thanks for the detailed report. I don't think that it is related to
the Hadoop or Scala version. I think the following happens:

- Occasionally, one of your tasks seems to be extremely slow in
registering its produced intermediate result (the data shuffled
between TaskManagers)
- Another task is already requesting to consume data from this task
but cannot find it (after multiple retries) and it fails the complete
job (your stack trace)

That happens only occasionally probably due to load in your cluster.
The slow down could have multiple reasons...
- Is your Hadoop cluster resource constrained and the tasks are slow to deploy?
- Is your application JAR very large and needs a lot of time downloading?

We have two options at this point:
1) You can increase the maximum retries via the config option:
"taskmanager.network.request-backoff.max" The default is 10000
(milliseconds) and specifies what the maximum request back off is [1].
Increasing this to 30000 would give you two extra retries with pretty
long delays (see [1]).

2) To be sure that this is really what is happening we could increase
the log level of certain classes and check whether they have
registered their results or not. If you want to do this, I'm more than
happy to provide you with some classes to enable DEBUG logging for.

What do you think?

– Ufuk

DETAILS
=======

- The TaskManagers produce and consume intermediate results
- When a TaskManager wants to consume a result, it directly queries
the producing TaskManager for it
- An intermediate result becomes ready for consumption during initial
task setup (state DEPLOYING)
- When a TaskManager is slow to register its intermediate result and
the consumer requests the result before it is ready, it can happen
that a requested partition is "not found"

This is what is also happening here. We retry to request the
intermediate result multiple times with timed backoff [1] and only
fail the request (your stack trace) if the partition is still not
ready although we expect it to be ready (that is there was no failure
at the producing task).

[1] Starting by default at 100 millis and going up to 10_000 millis by
doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000)


On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <ni...@basjes.nl> wrote:
> Hi,
>
> I'm having some trouble running a java based Flink job in a yarn-session.
>
> The job itself consists of reading a set of files resulting in a DataStream
> (I use DataStream because in the future I intend to change the file with a
> Kafka feed), then does some parsing and eventually writes the data into
> HBase.
>
> Most of the time running this works fine yet sometimes it fails with this
> exception:
>
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
> not found.
>       at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>       at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>       at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>       at
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>       at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>       at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>       at
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>       at akka.dispatch.OnComplete.internal(Future.scala:248)
>       at akka.dispatch.OnComplete.internal(Future.scala:245)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>       at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>       at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>       at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>       at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>       at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>       at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> I went through all logs at the Hadoop side of all the related containers and
> other than this exception I did not see any warning/error that might explain
> what is going on here.
>
> Now the "Most of the time running this works fine" makes this hard to
> troubleshoot. When I run the same job again it may run perfectly that time.
>
> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked my
> pom.xml and I use the same version for Flink / Scala in there.
>
> The command used to start the yarn-session on my experimental cluster (no
> security, no other users):
>
> /usr/local/flink-1.3.2/bin/yarn-session.sh \
>     --container 180 \
>     --name "Flink on Yarn Experiments" \
>     --slots                     1     \
>     --jobManagerMemory          4000  \
>     --taskManagerMemory         4000  \
>     --streaming                       \
>     --detached
>
> Two relevant fragments from my application pom.xml:
>
> <flink.version>1.3.2</flink.version>
> <flink.scala.version>2.11</flink.scala.version>
>
>
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-java</artifactId>
>   <version>${flink.version}</version>
> </dependency>
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
>   <version>${flink.version}</version>
> </dependency>
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-clients_${flink.scala.version}</artifactId>
>   <version>${flink.version}</version>
> </dependency>
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-hbase_${flink.scala.version}</artifactId>
>   <version>${flink.version}</version>
> </dependency>
>
>
> I could really use some suggestions where to look for the root cause of
> this.
> Is this something in my application? My Hadoop cluster? Or is this a problem
> in Flink 1.3.2?
>
> Thanks.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes

Reply via email to