Hey Niels,

any update on this?

– Ufuk


On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <u...@apache.org> wrote:
> Hey Niels,
>
> thanks for the detailed report. I don't think that it is related to
> the Hadoop or Scala version. I think the following happens:
>
> - Occasionally, one of your tasks seems to be extremely slow in
> registering its produced intermediate result (the data shuffled
> between TaskManagers)
> - Another task is already requesting to consume data from this task
> but cannot find it (after multiple retries) and it fails the complete
> job (your stack trace)
>
> That happens only occasionally probably due to load in your cluster.
> The slow down could have multiple reasons...
> - Is your Hadoop cluster resource constrained and the tasks are slow to 
> deploy?
> - Is your application JAR very large and needs a lot of time downloading?
>
> We have two options at this point:
> 1) You can increase the maximum retries via the config option:
> "taskmanager.network.request-backoff.max" The default is 10000
> (milliseconds) and specifies what the maximum request back off is [1].
> Increasing this to 30000 would give you two extra retries with pretty
> long delays (see [1]).
>
> 2) To be sure that this is really what is happening we could increase
> the log level of certain classes and check whether they have
> registered their results or not. If you want to do this, I'm more than
> happy to provide you with some classes to enable DEBUG logging for.
>
> What do you think?
>
> – Ufuk
>
> DETAILS
> =======
>
> - The TaskManagers produce and consume intermediate results
> - When a TaskManager wants to consume a result, it directly queries
> the producing TaskManager for it
> - An intermediate result becomes ready for consumption during initial
> task setup (state DEPLOYING)
> - When a TaskManager is slow to register its intermediate result and
> the consumer requests the result before it is ready, it can happen
> that a requested partition is "not found"
>
> This is what is also happening here. We retry to request the
> intermediate result multiple times with timed backoff [1] and only
> fail the request (your stack trace) if the partition is still not
> ready although we expect it to be ready (that is there was no failure
> at the producing task).
>
> [1] Starting by default at 100 millis and going up to 10_000 millis by
> doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000)
>
>
> On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <ni...@basjes.nl> wrote:
>> Hi,
>>
>> I'm having some trouble running a java based Flink job in a yarn-session.
>>
>> The job itself consists of reading a set of files resulting in a DataStream
>> (I use DataStream because in the future I intend to change the file with a
>> Kafka feed), then does some parsing and eventually writes the data into
>> HBase.
>>
>> Most of the time running this works fine yet sometimes it fails with this
>> exception:
>>
>> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
>> Partition 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
>> not found.
>>       at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>>       at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:128)
>>       at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345)
>>       at
>> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1286)
>>       at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123)
>>       at org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118)
>>       at
>> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>>       at akka.dispatch.OnComplete.internal(Future.scala:248)
>>       at akka.dispatch.OnComplete.internal(Future.scala:245)
>>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>>       at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>       at
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>       at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>       at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>       at
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>       at 
>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>       at
>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>       at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>       at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>       at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>       at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> I went through all logs at the Hadoop side of all the related containers and
>> other than this exception I did not see any warning/error that might explain
>> what is going on here.
>>
>> Now the "Most of the time running this works fine" makes this hard to
>> troubleshoot. When I run the same job again it may run perfectly that time.
>>
>> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double checked my
>> pom.xml and I use the same version for Flink / Scala in there.
>>
>> The command used to start the yarn-session on my experimental cluster (no
>> security, no other users):
>>
>> /usr/local/flink-1.3.2/bin/yarn-session.sh \
>>     --container 180 \
>>     --name "Flink on Yarn Experiments" \
>>     --slots                     1     \
>>     --jobManagerMemory          4000  \
>>     --taskManagerMemory         4000  \
>>     --streaming                       \
>>     --detached
>>
>> Two relevant fragments from my application pom.xml:
>>
>> <flink.version>1.3.2</flink.version>
>> <flink.scala.version>2.11</flink.scala.version>
>>
>>
>>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-java</artifactId>
>>   <version>${flink.version}</version>
>> </dependency>
>>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-streaming-java_${flink.scala.version}</artifactId>
>>   <version>${flink.version}</version>
>> </dependency>
>>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-clients_${flink.scala.version}</artifactId>
>>   <version>${flink.version}</version>
>> </dependency>
>>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-hbase_${flink.scala.version}</artifactId>
>>   <version>${flink.version}</version>
>> </dependency>
>>
>>
>> I could really use some suggestions where to look for the root cause of
>> this.
>> Is this something in my application? My Hadoop cluster? Or is this a problem
>> in Flink 1.3.2?
>>
>> Thanks.
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes

Reply via email to