​Hello,

I am running into a situation where the Flink threads responsible for my
operator execution are all stuck on WAITING mode.
Before anything else, this is my machine's spec:

Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
GenuineIntel GNU/Linux
256 GB RAM

I am running in local mode on a machine with a considerable amount of
memory, so perhaps that may be triggering some execution edge-case?

Moving on, this is my Java:

openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT
with LocalEnvironment
on this large-memory machine, with parallelism set to one:

Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.
createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().
enableObjectReuse();
env.setParallelism(1);

This initializes the execution environment for a series of sequential jobs
(any data dependency between jobs is flushed to disk on job *i *and read
back from disk into a DataSet in job *i + 1*).
To reiterate, I am not launching a Flink cluster, I am just executing in
local mode from a code base compiled with Maven.

I have tested this program via mvn exec:exec with different values of
memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result
is always the same: the process' memory fills up completely and then the
process' CPU usage drops to 0%.
This is strange because if it was lack of memory, I would expect an
OutOfMemoryError.

I have debugged with IntelliJ IDEA and obtained thread dumps from different
executions, and realized quite a few operator threads are stuck on
java.lang.Thread.State:
WAITING.

There are four major threads that I find to be in this waiting state.
The thread dumps I obtained show me where the wait calls originated:



*Number 1:*"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) ->
Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5
tid=0xd93 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.
AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.
MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.
next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.
CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$
ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.
processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.
MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.
ReusingBuildSecondHashJoinIterator.callWithNextKey(
ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(
JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(
BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(
BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)


*Number 2:*

"Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153"
prio=5 tid=0xd8e nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.
AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.
MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.
next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.
CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$
ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.
processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.
MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.
ReusingBuildSecondHashJoinIterator.callWithNextKey(
ReusingBuildSecondHashJoinIterator.java:122)
      at org.apache.flink.runtime.operators.JoinDriver.run(
JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(
BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(
BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

*Number 3:*

"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5
tid=0xd75 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
      at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
      at org.apache.flink.runtime.io.network.api.reader.
AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
      at org.apache.flink.runtime.io.network.api.reader.
MutableRecordReader.next(MutableRecordReader.java:47)
      at org.apache.flink.runtime.operators.util.ReaderIterator.
next(ReaderIterator.java:59)
      at org.apache.flink.runtime.operators.util.metrics.
CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
      at org.apache.flink.runtime.operators.hash.MutableHashTable$
ProbeIterator.next(MutableHashTable.java:1929)
      at org.apache.flink.runtime.operators.hash.MutableHashTable.
processProbeIter(MutableHashTable.java:505)
      at org.apache.flink.runtime.operators.hash.
MutableHashTable.nextRecord(MutableHashTable.java:666)
      at org.apache.flink.runtime.operators.hash.
ReusingBuildFirstHashJoinIterator.callWithNextKey(
ReusingBuildFirstHashJoinIterator.java:123)
      at org.apache.flink.runtime.operators.JoinDriver.run(
JoinDriver.java:221)
      at org.apache.flink.runtime.operators.BatchTask.run(
BatchTask.java:503)
      at org.apache.flink.runtime.operators.BatchTask.invoke(
BatchTask.java:368)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
      at java.lang.Thread.run(Thread.java:748)

*Number 4:*

"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.CompletableFuture$Signaller.
block(CompletableFuture.java:1693)
      at java.util.concurrent.ForkJoinPool.managedBlock(
ForkJoinPool.java:3323)
      at java.util.concurrent.CompletableFuture.waitingGet(
CompletableFuture.java:1729)
      at java.util.concurrent.CompletableFuture.get(
CompletableFuture.java:1895)
      at org.apache.flink.runtime.minicluster.MiniCluster.
executeJobBlocking(MiniCluster.java:519)
      at org.apache.flink.client.LocalExecutor.executePlan(
LocalExecutor.java:231)
      - locked <0x23eb> (a java.lang.Object)
      at org.apache.flink.api.java.LocalEnvironment.execute(
LocalEnvironment.java:91)
      at org.apache.flink.api.java.ExecutionEnvironment.execute(
ExecutionEnvironment.java:815)
      at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
      at my.package.algorithm.Misc.SummaryGraphBuilder.
summaryGraph(Misc.java:103)
      at my.package.algorithm.Sample.computeApproximateDeltaFast(
Sample.java:492)
      at my.package.algorithm.Sample.run(Sample.java:291).
      at java.lang.Thread.run(Thread.java:748)

While I realize these dumps on their own may not be helpful, they at least
(as far as I know) indicate that the threads are all waiting on something.
But if it was resource scarcity I believe the program would terminate with
an exception.
And if it was garbage collection activity, I believe the JVM process would
not be at 0% CPU usage.

*Note: *I realize I didn't provide the user-code code that generates the
execution plan for Flink which led to the contexts in which the threads are
waiting, but I hope it may not be necessary.
My problem now is that I am unsure on how to proceed to further debug this
issue:
- The assigned memory is fully used, but there are no exceptions about lack
of memory.
- The CPU usage is at 0% and all threads are all in a waiting state, but I
don't understand what signal they're waiting for exactly.

Hoping anyone might be able to give me a hint.

Thank you very much for your time.

Best regards,

Miguel E. Coimbra

Reply via email to