Hello,

Thread #1-3 are waiting for input, Thread #4 is waiting for the job to finish.

To further debug this I would look into what the preceding operators are doing, whether they are blocked on something or are emitting records (which you can check in the UI/metrics).

On 15.04.2018 18:40, Miguel Coimbra wrote:
​Hello,

I am running into a situation where the Flink threads responsible for my operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830 @ 2.13GHz GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with LocalEnvironment on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs (any data dependency between jobs is flushed to disk on job /i /and read back from disk into a DataSet in job /i + 1/). To reiterate, I am not launching a Flink cluster, I am just executing in local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result is always the same: the process' memory fills up completely and then the process' CPU usage drops to 0%. This is strange because if it was lack of memory, I would expect an OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:

*Number 1:

*"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929) at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122) at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)


*Number 2:*

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929) at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122) at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

*Number 3:*

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929) at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123) at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

*Number 4:*

"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
      - locked <0x23eb> (a java.lang.Object)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
      at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103) at my.package.algorithm.Sample.computeApproximateDeltaFast(Sample.java:492)
      at my.package.algorithm.Sample.run(Sample.java:291).
      at java.lang.Thread.run(Thread.java:748)

While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something. But if it was resource scarcity I believe the program would terminate with an exception. And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

*Note: *I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, but I hope it may not be necessary. My problem now is that I am unsure on how to proceed to further debug this issue: - The assigned memory is fully used, but there are no exceptions about lack of memory. - The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

Hoping anyone might be able to give me a hint.

Thank you very much for your time.

Best regards,

Miguel E. Coimbra


Reply via email to