[ https://issues.apache.org/jira/browse/FLINK-9242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710599#comment-16710599 ]
Miguel E. Coimbra commented on FLINK-9242: ------------------------------------------ [~NicoK] I am getting similar behavior now on Apache Flink 1.8-SNAPSHOT. Execution holds at a random point. Have the components pertaining this issue been modified again? > LocalEnvironment - Operator threads stuck on java.lang.Thread.State: WAITING > ---------------------------------------------------------------------------- > > Key: FLINK-9242 > URL: https://issues.apache.org/jira/browse/FLINK-9242 > Project: Flink > Issue Type: Bug > Components: Cluster Management > Affects Versions: 1.5.0, 1.6.0 > Environment: *SETUP 1* > - Windows 7 Pro x64 > - Java 1.8.0_162 x64 > - 8 GB RAM > - Intel i7 620M > *SETUP 2* > - Slackware 14.2 x64 GNU/Linux 4.4.88 > - Java openjdk version "1.8.0_151" > OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware) > OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode) > - 256 GB RAM > - 8x Intel(R) Xeon(R) CPU E7- 4830 > Reporter: Miguel E. Coimbra > Priority: Major > Attachments: flink_debugging.PNG > > > Hello, > As per Fabian Hueske's advice on the mailing list, I am detailing the problem > here. > This happens on my code in both 1.5-SNAPSHOT and 1.6-SNAPSHOT but not on > 1.4.2 (stable). > I believe it might be some sort of regression which was introduced post > 1.4.2. > I'm getting different DataSet operators blocked on java.lang.Thread.State: > WAITING for no apparent reason. > I only tested this using a LocalEnvironment which is created like so: > {code:java} > final Configuration conf = new Configuration(); > conf.setString("web.log.path", logPath); > conf.setString("jobmanager.rpc.address", "127.0.0.1"); > conf.setString("web.port", "8081-9000"); > conf.setString("query.server.ports", "2000-30000"); > conf.setString("query.proxy.ports", "30001-60000"); > LocalEnvironment lenv = (LocalEnvironment) > ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); > {code} > (also tried creating the LocalEnvironment without the web interface and it > also happens) > I have debugged with IntelliJ IDEA and obtained thread dumps from different > executions, and realized quite a few operator threads are stuck on > java.lang.Thread.State: WAITING. > I cannot share my code at the moment, but essentially I have a series of jobs > and some use common data (I made sure it was written to disk in job _i_ and > read back from disk in job _i + 1_) > There are three major threads that I find to be in this waiting state. > I'm running on local mode with a parallelism of one. > The thread dumps I obtained show me where the wait calls originated: > > {code:java} > Number 1: > "CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct > at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA > waiting > java.lang.Thread.State: WAITING > at java.lang.Object.wait(Object.java:-1) > at java.lang.Object.wait(Object.java:502) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > at > org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) > at > org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) > at > org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122) > at > org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > {code:java} > Number 2: > "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 > tid=0xd8e nid=NA waiting > java.lang.Thread.State: WAITING > at java.lang.Object.wait(Object.java:-1) > at java.lang.Object.wait(Object.java:502) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > at > org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) > at > org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) > at > org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122) > at > org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > {code:java} > Number 3: > "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 > nid=NA waiting > java.lang.Thread.State: WAITING > at java.lang.Object.wait(Object.java:-1) > at java.lang.Object.wait(Object.java:502) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > at > org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) > at > org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) > at > org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123) > at > org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > While I realize these dumps on their own may not be helpful, they at least > (as far as I know) indicate that the threads are all waiting on something. > But if it was resource scarcity I believe the program would terminate with > an exception. > And if it was garbage collection activity, I believe the JVM process would > not be at 0% CPU usage. > I realize I didn't provide the user-code code that generates the execution > plan for Flink which led to the contexts in which the threads are waiting, my > apologies. I will do so as soon a I get a chance. > To highlight the symptoms: > - The memory assigned to the JVM is fully used, but there are no exceptions > about lack of memory (and the system had plenty more memory available). > - The CPU usage is at 0% and all threads are all in a waiting state, but I > don't understand what signal they're waiting for exactly. > I noticed something suspicious as well: I have chains of operators where the > first operator will ingest the expected amount of records but will not emit > any, leaving the following operator empty in a "RUNNING" state (see attached > image). > I think we may consider there is some complexity in my scenario, at least > when compared to samples in the Flink documentation. When visualizing the job > plan, it is necessary to zoom in and out to check on specific parts of the > execution scheme. > Among the sequence of operations, I am: > 1 - Creating a DataSet > 2 - Using it as an initial workset in a DeltaIteration > 2.1 - Joining the workset on each iteration with the edges of a graph > 3 - Using the final solution set resulting from the DeltaIteration to build a > graph and execute an algorithm over it (.run method). > - The graph is not prohibitively big and I have a very low limit on the > number of iterations (at most 4 or 5). > I will add more information as soon as it is available. > It seems, however, that there is some sort of lack of synchronization > occurring and perhaps the operators _become isolated_? -- This message was sent by Atlassian JIRA (v7.6.3#76005)