Hello,
Thread #1-3 are waiting for input, Thread #4 is waiting for the job to
finish.
To further debug this I would look into what the preceding operators are
doing, whether they are blocked on something or are emitting records
(which you can check in the UI/metrics).
On 15.04.2018 18:40, Miguel Coimbra wrote:
​Hello,
I am running into a situation where the Flink threads responsible for
my operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:
Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830 @ 2.13GHz
GenuineIntel GNU/Linux
256 GB RAM
I am running in local mode on a machine with a considerable amount of
memory, so perhaps that may be triggering some execution edge-case?
Moving on, this is my Java:
openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT
with LocalEnvironment on this large-memory machine, with parallelism
set to one:
Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment)
ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);
This initializes the execution environment for a series of sequential
jobs (any data dependency between jobs is flushed to disk on job /i
/and read back from disk into a DataSet in job /i + 1/).
To reiterate, I am not launching a Flink cluster, I am just executing
in local mode from a code base compiled with Maven.
I have tested this program via mvn exec:exec with different values of
memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the
result is always the same: the process' memory fills up completely and
then the process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an
OutOfMemoryError.
I have debugged with IntelliJ IDEA and obtained thread dumps from
different executions, and realized quite a few operator threads are
stuck on java.lang.Thread.State: WAITING.
There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:
*Number 1:
*"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine
(Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5
tid=0xd93 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:502)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
at
org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at
org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
at
org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
*Number 2:*
"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153"
prio=5 tid=0xd8e nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:502)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
at
org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at
org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
at
org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
*Number 3:*
"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5
tid=0xd75 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:502)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
at
org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at
org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
at
org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
*Number 4:*
"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
- locked <0x23eb> (a java.lang.Object)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
at
my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
at
my.package.algorithm.Sample.computeApproximateDeltaFast(Sample.java:492)
at my.package.algorithm.Sample.run(Sample.java:291).
at java.lang.Thread.run(Thread.java:748)
While I realize these dumps on their own may not be helpful, they at
least (as far as I know) indicate that the threads are all waiting on
something.
But if it was resource scarcity I believe the program would terminate
with an exception.
And if it was garbage collection activity, I believe the JVM process
would not be at 0% CPU usage.
*Note: *I realize I didn't provide the user-code code that generates
the execution plan for Flink which led to the contexts in which the
threads are waiting, but I hope it may not be necessary.
My problem now is that I am unsure on how to proceed to further debug
this issue:
- The assigned memory is fully used, but there are no exceptions about
lack of memory.
- The CPU usage is at 0% and all threads are all in a waiting state,
but I don't understand what signal they're waiting for exactly.
Hoping anyone might be able to give me a hint.
Thank you very much for your time.
Best regards,
Miguel E. Coimbra