​Hello, I am running into a situation where the Flink threads responsible for my operator execution are all stuck on WAITING mode. Before anything else, this is my machine's spec:
Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830 @ 2.13GHz GenuineIntel GNU/Linux 256 GB RAM I am running in local mode on a machine with a considerable amount of memory, so perhaps that may be triggering some execution edge-case? Moving on, this is my Java: openjdk version "1.8.0_151" OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware) OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode) Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT with LocalEnvironment on this large-memory machine, with parallelism set to one: Configuration conf = new Configuration(); LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment. createLocalEnvironmentWithWebUI(conf); ExecutionEnvironment env = lenv; env.getConfig().enableSysoutLogging().enableClosureCleaner(). enableObjectReuse(); env.setParallelism(1); This initializes the execution environment for a series of sequential jobs (any data dependency between jobs is flushed to disk on job *i *and read back from disk into a DataSet in job *i + 1*). To reiterate, I am not launching a Flink cluster, I am just executing in local mode from a code base compiled with Maven. I have tested this program via mvn exec:exec with different values of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the result is always the same: the process' memory fills up completely and then the process' CPU usage drops to 0%. This is strange because if it was lack of memory, I would expect an OutOfMemoryError. I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING. There are four major threads that I find to be in this waiting state. The thread dumps I obtained show me where the wait calls originated: *Number 1:*"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting java.lang.Thread.State: WAITING at java.lang.Object.wait(Object.java:-1) at java.lang.Object.wait(Object.java:502) at org.apache.flink.runtime.io.network.partition.consumer. SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) at org.apache.flink.runtime.io.network.partition.consumer. SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) at org.apache.flink.runtime.io.network.api.reader. AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) at org.apache.flink.runtime.io.network.api.reader. MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator. next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.util.metrics. CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) at org.apache.flink.runtime.operators.hash.MutableHashTable$ ProbeIterator.next(MutableHashTable.java:1929) at org.apache.flink.runtime.operators.hash.MutableHashTable. processProbeIter(MutableHashTable.java:505) at org.apache.flink.runtime.operators.hash. MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash. ReusingBuildSecondHashJoinIterator.callWithNextKey( ReusingBuildSecondHashJoinIterator.java:122) at org.apache.flink.runtime.operators.JoinDriver.run( JoinDriver.java:221) at org.apache.flink.runtime.operators.BatchTask.run( BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke( BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) *Number 2:* "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting java.lang.Thread.State: WAITING at java.lang.Object.wait(Object.java:-1) at java.lang.Object.wait(Object.java:502) at org.apache.flink.runtime.io.network.partition.consumer. SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) at org.apache.flink.runtime.io.network.partition.consumer. SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) at org.apache.flink.runtime.io.network.api.reader. AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) at org.apache.flink.runtime.io.network.api.reader. MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator. next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.util.metrics. CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) at org.apache.flink.runtime.operators.hash.MutableHashTable$ ProbeIterator.next(MutableHashTable.java:1929) at org.apache.flink.runtime.operators.hash.MutableHashTable. processProbeIter(MutableHashTable.java:505) at org.apache.flink.runtime.operators.hash. MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash. ReusingBuildSecondHashJoinIterator.callWithNextKey( ReusingBuildSecondHashJoinIterator.java:122) at org.apache.flink.runtime.operators.JoinDriver.run( JoinDriver.java:221) at org.apache.flink.runtime.operators.BatchTask.run( BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke( BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) *Number 3:* "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting java.lang.Thread.State: WAITING at java.lang.Object.wait(Object.java:-1) at java.lang.Object.wait(Object.java:502) at org.apache.flink.runtime.io.network.partition.consumer. SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) at org.apache.flink.runtime.io.network.partition.consumer. SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) at org.apache.flink.runtime.io.network.api.reader. AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) at org.apache.flink.runtime.io.network.api.reader. MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator. next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.util.metrics. CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) at org.apache.flink.runtime.operators.hash.MutableHashTable$ ProbeIterator.next(MutableHashTable.java:1929) at org.apache.flink.runtime.operators.hash.MutableHashTable. processProbeIter(MutableHashTable.java:505) at org.apache.flink.runtime.operators.hash. MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash. ReusingBuildFirstHashJoinIterator.callWithNextKey( ReusingBuildFirstHashJoinIterator.java:123) at org.apache.flink.runtime.operators.JoinDriver.run( JoinDriver.java:221) at org.apache.flink.runtime.operators.BatchTask.run( BatchTask.java:503) at org.apache.flink.runtime.operators.BatchTask.invoke( BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) *Number 4:* "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Unsafe.java:-1) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller. block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock( ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet( CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get( CompletableFuture.java:1895) at org.apache.flink.runtime.minicluster.MiniCluster. executeJobBlocking(MiniCluster.java:519) at org.apache.flink.client.LocalExecutor.executePlan( LocalExecutor.java:231) - locked <0x23eb> (a java.lang.Object) at org.apache.flink.api.java.LocalEnvironment.execute( LocalEnvironment.java:91) at org.apache.flink.api.java.ExecutionEnvironment.execute( ExecutionEnvironment.java:815) at org.apache.flink.api.java.DataSet.count(DataSet.java:398) at my.package.algorithm.Misc.SummaryGraphBuilder. summaryGraph(Misc.java:103) at my.package.algorithm.Sample.computeApproximateDeltaFast( Sample.java:492) at my.package.algorithm.Sample.run(Sample.java:291). at java.lang.Thread.run(Thread.java:748) While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something. But if it was resource scarcity I believe the program would terminate with an exception. And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage. *Note: *I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, but I hope it may not be necessary. My problem now is that I am unsure on how to proceed to further debug this issue: - The assigned memory is fully used, but there are no exceptions about lack of memory. - The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly. Hoping anyone might be able to give me a hint. Thank you very much for your time. Best regards, Miguel E. Coimbra