Hi Miguel, Actually, a lot has changed since 1.4. Flink 1.5 will feature a completely (cluster) setup and deployment model. The dev effort is known as FLIP-6 [1]. So it is not unlikely that you discovered a regression.
Would you mind opening a JIRA ticker for the issue? Thank you very much, Fabian Best, Fabian [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 2018-04-21 20:08 GMT+02:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: > Hello, > > Just to provide a brief update: I got this working by moving to the stable > version 1.4.2. > I previously tested under 1.5-SNAPSHOT and 1.6-SNAPSHOT and the problem > occurred in both. > > If I'm not mistaken, LocalEnvironment is primarily targeted at > debugging scenarios. > In my case, I explicitly want to use it on a complex series of jobs for > now. > However, it seems some sort of bug was introduced after 1.4.2? > > I ask this because my same code leads to the operators stuck on > > java.lang.Thread.State: WAITING in the snapshot versions but it works > fine in 1.4.2. > Was there any specific design change after 1.4.2 regarding the way the > Flink cluster is simulated (LocalFlinkMiniCluster if I'm not mistaken?) > when using LocalEnvironment? > > I would like to explore this issue and perhaps contribute to fixing it or > at least understand. > > Thank you very much. > > > Miguel E. Coimbra > Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> > > On 17 April 2018 at 22:52, Miguel Coimbra <miguel.e.coim...@gmail.com> > wrote: > >> Hello James, >> >> Thanks for the information. >> I noticed something suspicious as well: I have chains of operators where >> the first operator will ingest the expected amount of records but will not >> emit any, leaving the following operator empty in a "RUNNING" state. >> For example: >> >> >> >> I will get back if I find out more. >> >> >> Best regards, >> >> Miguel E. Coimbra >> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >> >> On 17 April 2018 at 20:59, James Yu <cyu...@gmail.com> wrote: >> >>> Miguel, I and my colleague ran into same problem yesterday. >>> We were expecting Flink to get 4 inputs from Kafka and write the inputs >>> to Cassandra, but the operators got stuck after the 1st input is written >>> into Cassandra. >>> This is how DAG looks like: >>> Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink) >>> After we disable the auto chaining (https://ci.apache.org/project >>> s/flink/flink-docs-release-1.4/dev/stream/operators/#task-ch >>> aining-and-resource-groups), all 4 inputs are read from Kafka and >>> written into Cassandra. >>> We are still figuring out why the chaining causes the blocking. >>> >>> >>> This is a UTF-8 formatted mail >>> ----------------------------------------------- >>> James C.-C.Yu >>> +886988713275 >>> >>> 2018-04-18 6:57 GMT+08:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: >>> >>>> Chesnay, following your suggestions I got access to the web interface >>>> and also took a closer look at the debugging logs. >>>> I have noticed one problem regarding the web interface port - it keeps >>>> changing port now and then during my Java program's execution. >>>> >>>> Not sure if that is due to my program launching several job executions >>>> sequentially, but the fact is that it happened. >>>> Since I am accessing the web interface via tunneling, it becomes rather >>>> cumbersome to keep adapting it. >>>> >>>> Another particular problem I'm noticing is that this exception >>>> frequently pops up (debugging with log4j): >>>> >>>> 00:17:54,368 DEBUG org.apache.flink.runtime.jobma >>>> ster.slotpool.SlotPool - Releasing slot with slot request id >>>> 9055ef473251505dac04c99727106dc9. >>>> org.apache.flink.util.FlinkException: Slot is being returned to the >>>> SlotPool. >>>> at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$Provide >>>> rAndOwner.returnAllocatedSlot(SlotPool.java:1521) >>>> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo >>>> t.lambda$releaseSlot$0(SingleLogicalSlot.java:130) >>>> at java.util.concurrent.CompletableFuture.uniHandle(Completable >>>> Future.java:822) >>>> at java.util.concurrent.CompletableFuture.uniHandleStage(Comple >>>> tableFuture.java:834) >>>> at java.util.concurrent.CompletableFuture.handle(CompletableFut >>>> ure.java:2155) >>>> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo >>>> t.releaseSlot(SingleLogicalSlot.java:130) >>>> at org.apache.flink.runtime.executiongraph.Execution.releaseAss >>>> ignedResource(Execution.java:1239) >>>> at org.apache.flink.runtime.executiongraph.Execution.markFinish >>>> ed(Execution.java:946) >>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat >>>> eState(ExecutionGraph.java:1588) >>>> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecu >>>> tionState(JobMaster.java:593) >>>> at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) >>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>> thodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo >>>> cation(AkkaRpcActor.java:210) >>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage >>>> (AkkaRpcActor.java:154) >>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleM >>>> essage(FencedAkkaRpcActor.java:66) >>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onRece >>>> ive$1(AkkaRpcActor.java:132) >>>> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell >>>> .scala:544) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j >>>> ava:260) >>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For >>>> kJoinPool.java:1339) >>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo >>>> l.java:1979) >>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW >>>> orkerThread.java:107) >>>> >>>> Don't know if the internals of Flink are explicitly using an exception >>>> for control flow, but there are several occurrences of this as time goes >>>> by. >>>> >>>> Regarding my program itself, I've achieved some progress. >>>> In my program I need to do a sequence of series of Flink jobs, and need >>>> extra care to make sure no DataSet instance from job *i* is being used >>>> in an operator in job *i + 1*. >>>> I believe this was generating the waiting scenarios I describe in an >>>> earlier email. >>>> The bottom line is to be extra careful about when job executions are >>>> actually triggered and to make sure that a DataSet which will need to >>>> be used in different Flink jobs is available for example as a file in >>>> secondary storage (possibly masked as a memory-mapping) and is exclusively >>>> read from that source. >>>> This means ensuring the job that originally produces a DataSet (for >>>> reuse on a later job) assigns to it a DataSink for secondary storage. >>>> >>>> I'm going to keep digging taking this in account - if will report back >>>> if I manage to fix everything or find a new problem. >>>> >>>> Thanks again, >>>> >>>> >>>> >>>> Miguel E. Coimbra >>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>> >>>> On 16 April 2018 at 10:26, Chesnay Schepler <ches...@apache.org> wrote: >>>> >>>>> ah yes, currently when you use that method the UI is started on a >>>>> random port. I'm currently fixing that in this PR >>>>> <https://github.com/apache/flink/pull/5814> that will be merged >>>>> today. For now you will enable logging and search for something along the >>>>> lines of "http://<host>:<port> was granted leadership" >>>>> >>>>> Sorry for the inconvenience. >>>>> >>>>> On 16.04.2018 15:04, Miguel Coimbra wrote: >>>>> >>>>> Thanks for the suggestions Chesnay, I will try them out. >>>>> >>>>> However, I have already tried your suggestion with the dependency >>>>> flink-runtime-web and nothing happened. >>>>> If I understood you correctly, adding that dependency in the pom.xml >>>>> would make it so the web front-end is running when I call the following >>>>> line? >>>>> >>>>> LocalEnvironment lenv = (LocalEnvironment) >>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >>>>> >>>>> I added flink-runtime-web in my pom.xml, recompiled and launched the >>>>> program but I simply got "Unable to connect" in my browser (Firefox) on >>>>> localhost:8081. >>>>> Performing wget on localhost:8081 resulted in this: >>>>> >>>>> $ wget localhost:8081 >>>>> --2018-04-16 12:47:26-- http://localhost:8081/ >>>>> Resolving localhost (localhost)... ::1, 127.0.0.1 >>>>> Connecting to localhost (localhost)|::1|:8081... failed: Connection >>>>> refused. >>>>> Connecting to localhost (localhost)|127.0.0.1|:8081... failed: >>>>> Connection refused. >>>>> >>>>> It seems something was bound to localhost:8081 but the connection is >>>>> not working for some reason. >>>>> I probably am skipping some important detail. >>>>> These are some of my dependencies: >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-java</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-core</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-clients_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-gelly_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-gelly-examples_${scala.binary.version}</ar >>>>> tifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-streaming-java_${scala.binary.version}</ar >>>>> tifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</ >>>>> artifactId> >>>>> <version>${flink.version}</version> >>>>> </dependency> >>>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-ru >>>>> ntime-web --> >>>>> >>>>> >>>>> >>>>> >>>>> *<dependency> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> >>>>> <version>${flink.version}</version> </dependency>* >>>>> >>>>> Have you managed to get the web front-end in local mode? >>>>> >>>>> >>>>> Best regards, >>>>> >>>>> Miguel E. Coimbra >>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>>> >>>>> On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org> >>>>> wrote: >>>>> >>>>>> The thing with createLocalEnvironmentWithWebUI is that it requires >>>>>> flink-runtime-web to be on the classpath, which is rarely the class >>>>>> when running things in the IDE. >>>>>> It should work fine in the IDE if you add it as a dependency to your >>>>>> project. This should've been logged as a warning. >>>>>> >>>>>> Chaining is unrelated to this issue as join operators are never >>>>>> chained to one another. >>>>>> Lambda functions are also not the issue, if they were the job would >>>>>> fail much earlier. >>>>>> >>>>>> It is reasonable that T3 is blocked if T1 is blocked. T1 gets no >>>>>> input hence produces no output, which now also blocks T3. >>>>>> >>>>>> There are multiple possible explanations i can come up with: >>>>>> * the preceding operators are blocked on something or *really *slow >>>>>> * the preceding operators are actually finished, but aren't shutting >>>>>> down due to an implementation error >>>>>> * a deadlock in Flink's join logic >>>>>> * a deadlock in Flink's network stack >>>>>> >>>>>> For the first 2 we will have to consult the UI or logs. You said you >>>>>> were dumping the input DataSets into files, but were they actually >>>>>> complete? >>>>>> >>>>>> A deadlock in the network stack should appear as all existing >>>>>> operator threads being blocked. >>>>>> We can probably rule out a problem with the join logic by removing >>>>>> the second join and trying again. >>>>>> >>>>>> >>>>>> >>>>>> On 16.04.2018 03:10, Miguel Coimbra wrote: >>>>>> >>>>>> Hello, >>>>>> >>>>>> It would seem that the function which is supposed to launch local >>>>>> mode with the web front-end doesn't launch the front-end at all... >>>>>> This function seems not to be doing what it is supposed to do, if I'm >>>>>> not mistaken: >>>>>> >>>>>> LocalEnvironment lenv = (LocalEnvironment) >>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >>>>>> >>>>>> Regarding the preceding operators, the thread dumps I got were >>>>>> pointing to a specific set of operations over DataSet instances that >>>>>> were passed into my function. >>>>>> Below I show the code segment and put the lines where threads are >>>>>> waiting in *bold*: >>>>>> >>>>>> public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final >>>>>> Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) { >>>>>> return vertices >>>>>> .joinWithHuge(originalGraph.getEdges()) >>>>>> .where(0).equalTo(0) >>>>>> * .with((source, edge) -> edge)* *// Thread 1 is blocked >>>>>> here* >>>>>> .returns(originalGraph.getEdges().getType()) >>>>>> .join(vertices) >>>>>> .where(1).equalTo(0) >>>>>> * .with((e, v) -> e) // Thread 3 is blocked here* >>>>>> .returns(originalGraph.getEdges().getType()) >>>>>> .distinct(0, 1); >>>>>> } >>>>>> >>>>>> Note: the edges inside the graph originalGraph edge DataSet are much >>>>>> greater in number than the elements of the vertices DataSet, so I >>>>>> believe that function is being used correctly. >>>>>> >>>>>> I will try testing with remote (cluster) mode to have access to the >>>>>> web front-end, but I have some questions for now: >>>>>> >>>>>> - The fact that they are blocked in different JoinOperator instances >>>>>> that are chained, is this a result of Flink's default pipeline mechanism? >>>>>> - Could there be a problem stemming from the fact they are both >>>>>> waiting on lambdas? >>>>>> - I have tried dumping both DataSet variables originalGraph and vertices >>>>>> into files (the ones being used in this code), and they produced >>>>>> correct values (non-empty files), so I don't have a clue what the threads >>>>>> inside Flink's runtime are waiting on. >>>>>> >>>>>> Thanks for the help so far Chesnay. >>>>>> >>>>>> >>>>>> Miguel E. Coimbra >>>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>>>> >>>>>> ---------- Forwarded message ---------- >>>>>> >>>>>>> From: Chesnay Schepler <ches...@apache.org> >>>>>>> To: user@flink.apache.org >>>>>>> Cc: >>>>>>> Bcc: >>>>>>> Date: Sun, 15 Apr 2018 18:54:33 +0200 >>>>>>> Subject: Re: Unsure how to further debug - operator threads stuck on >>>>>>> java.lang.Thread.State: WAITING >>>>>>> Hello, >>>>>>> >>>>>>> Thread #1-3 are waiting for input, Thread #4 is waiting for the job >>>>>>> to finish. >>>>>>> >>>>>>> To further debug this I would look into what the preceding operators >>>>>>> are doing, whether they are blocked on something or are emitting records >>>>>>> (which you can check in the UI/metrics). >>>>>>> >>>>>>> On 15.04.2018 18:40, Miguel Coimbra wrote: >>>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I am running into a situation where the Flink threads responsible >>>>>>> for my operator execution are all stuck on WAITING mode. >>>>>>> Before anything else, this is my machine's spec: >>>>>>> >>>>>>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830 @ 2.13GHz >>>>>>> GenuineIntel GNU/Linux >>>>>>> 256 GB RAM >>>>>>> >>>>>>> I am running in local mode on a machine with a considerable amount >>>>>>> of memory, so perhaps that may be triggering some execution edge-case? >>>>>>> >>>>>>> Moving on, this is my Java: >>>>>>> >>>>>>> openjdk version "1.8.0_151" >>>>>>> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware) >>>>>>> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode) >>>>>>> >>>>>>> Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT >>>>>>> with LocalEnvironment on this large-memory machine, with >>>>>>> parallelism set to one: >>>>>>> >>>>>>> Configuration conf = new Configuration(); >>>>>>> LocalEnvironment lenv = (LocalEnvironment) >>>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >>>>>>> ExecutionEnvironment env = lenv; >>>>>>> env.getConfig().enableSysoutLogging().enableClosureCleaner() >>>>>>> .enableObjectReuse(); >>>>>>> env.setParallelism(1); >>>>>>> >>>>>>> This initializes the execution environment for a series of >>>>>>> sequential jobs (any data dependency between jobs is flushed to disk on >>>>>>> job *i >>>>>>> *and read back from disk into a DataSet in job *i + 1*). >>>>>>> To reiterate, I am not launching a Flink cluster, I am just >>>>>>> executing in local mode from a code base compiled with Maven. >>>>>>> >>>>>>> I have tested this program via mvn exec:exec with different values >>>>>>> of memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and >>>>>>> the result is always the same: the process' memory fills up completely >>>>>>> and >>>>>>> then the process' CPU usage drops to 0%. >>>>>>> This is strange because if it was lack of memory, I would expect an >>>>>>> OutOfMemoryError. >>>>>>> >>>>>>> I have debugged with IntelliJ IDEA and obtained thread dumps from >>>>>>> different executions, and realized quite a few operator threads are >>>>>>> stuck >>>>>>> on >>>>>>> >>>>>>> java.lang.Thread.State: WAITING. >>>>>>> >>>>>>> There are four major threads that I find to be in this waiting state. >>>>>>> The thread dumps I obtained show me where the wait calls originated: >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Number 1: *"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) >>>>>>> -> Combine (Distinct at selectEdges(GraphUtils.java:330)) >>>>>>> (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting >>>>>>> java.lang.Thread.State: WAITING >>>>>>> at java.lang.Object.wait(Object.java:-1) >>>>>>> at java.lang.Object.wait(Object.java:502) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven >>>>>>> t(SingleInputGate.java:522) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven >>>>>>> t(SingleInputGate.java:491) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.api.reader.AbstractRecordReader.getNextRecord(Abstract >>>>>>> RecordReader.java:86) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.api.reader.MutableRecordReader.next(MutableRecordReade >>>>>>> r.java:47) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.util.ReaderIterator.next(ReaderIterator.java:59) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.util.metrics.CountingMutableObjectIterator.next(Countin >>>>>>> gMutableObjectIterator.java:36) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.MutableHashTable$ProbeIterator.next(MutableHashTab >>>>>>> le.java:1929) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.MutableHashTable.processProbeIter(MutableHashTable >>>>>>> .java:505) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey >>>>>>> (ReusingBuildSecondHashJoinIterator.java:122) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.JoinDriver.run(JoinDriver.java:221) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.BatchTask.run(BatchTask.java:503) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.BatchTask.invoke(BatchTask.java:368) >>>>>>> at org.apache.flink.runtime.taskm >>>>>>> anager.Task.run(Task.java:703) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> >>>>>>> >>>>>>> *Number 2:* >>>>>>> >>>>>>> "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) >>>>>>> (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting >>>>>>> java.lang.Thread.State: WAITING >>>>>>> at java.lang.Object.wait(Object.java:-1) >>>>>>> at java.lang.Object.wait(Object.java:502) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven >>>>>>> t(SingleInputGate.java:522) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven >>>>>>> t(SingleInputGate.java:491) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.api.reader.AbstractRecordReader.getNextRecord(Abstract >>>>>>> RecordReader.java:86) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.api.reader.MutableRecordReader.next(MutableRecordReade >>>>>>> r.java:47) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.util.ReaderIterator.next(ReaderIterator.java:59) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.util.metrics.CountingMutableObjectIterator.next(Countin >>>>>>> gMutableObjectIterator.java:36) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.MutableHashTable$ProbeIterator.next(MutableHashTab >>>>>>> le.java:1929) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.MutableHashTable.processProbeIter(MutableHashTable >>>>>>> .java:505) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey >>>>>>> (ReusingBuildSecondHashJoinIterator.java:122) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.JoinDriver.run(JoinDriver.java:221) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.BatchTask.run(BatchTask.java:503) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.BatchTask.invoke(BatchTask.java:368) >>>>>>> at org.apache.flink.runtime.taskm >>>>>>> anager.Task.run(Task.java:703) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> >>>>>>> *Number 3:* >>>>>>> >>>>>>> "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 >>>>>>> tid=0xd75 nid=NA waiting >>>>>>> java.lang.Thread.State: WAITING >>>>>>> at java.lang.Object.wait(Object.java:-1) >>>>>>> at java.lang.Object.wait(Object.java:502) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven >>>>>>> t(SingleInputGate.java:522) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.partition.consumer.SingleInputGate.getNextBufferOrEven >>>>>>> t(SingleInputGate.java:491) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.api.reader.AbstractRecordReader.getNextRecord(Abstract >>>>>>> RecordReader.java:86) >>>>>>> at org.apache.flink.runtime.io.ne >>>>>>> twork.api.reader.MutableRecordReader.next(MutableRecordReade >>>>>>> r.java:47) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.util.ReaderIterator.next(ReaderIterator.java:59) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.util.metrics.CountingMutableObjectIterator.next(Countin >>>>>>> gMutableObjectIterator.java:36) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.MutableHashTable$ProbeIterator.next(MutableHashTab >>>>>>> le.java:1929) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.MutableHashTable.processProbeIter(MutableHashTable >>>>>>> .java:505) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey( >>>>>>> ReusingBuildFirstHashJoinIterator.java:123) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.JoinDriver.run(JoinDriver.java:221) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.BatchTask.run(BatchTask.java:503) >>>>>>> at org.apache.flink.runtime.opera >>>>>>> tors.BatchTask.invoke(BatchTask.java:368) >>>>>>> at org.apache.flink.runtime.taskm >>>>>>> anager.Task.run(Task.java:703) >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> >>>>>>> *Number 4:* >>>>>>> >>>>>>> "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting >>>>>>> java.lang.Thread.State: WAITING >>>>>>> at sun.misc.Unsafe.park(Unsafe.java:-1) >>>>>>> at java.util.concurrent.locks.Loc >>>>>>> kSupport.park(LockSupport.java:175) >>>>>>> at java.util.concurrent.Completab >>>>>>> leFuture$Signaller.block(CompletableFuture.java:1693) >>>>>>> at java.util.concurrent.ForkJoinP >>>>>>> ool.managedBlock(ForkJoinPool.java:3323) >>>>>>> at java.util.concurrent.Completab >>>>>>> leFuture.waitingGet(CompletableFuture.java:1729) >>>>>>> at java.util.concurrent.Completab >>>>>>> leFuture.get(CompletableFuture.java:1895) >>>>>>> at org.apache.flink.runtime.minic >>>>>>> luster.MiniCluster.executeJobBlocking(MiniCluster.java:519) >>>>>>> at org.apache.flink.client.LocalE >>>>>>> xecutor.executePlan(LocalExecutor.java:231) >>>>>>> - locked <0x23eb> (a java.lang.Object) >>>>>>> at org.apache.flink.api.java.Loca >>>>>>> lEnvironment.execute(LocalEnvironment.java:91) >>>>>>> at org.apache.flink.api.java.Exec >>>>>>> utionEnvironment.execute(ExecutionEnvironment.java:815) >>>>>>> at org.apache.flink.api.java.DataSet.count(DataSet.java:398) >>>>>>> at my.package.algorithm.Misc.Summ >>>>>>> aryGraphBuilder.summaryGraph(Misc.java:103) >>>>>>> at my.package.algorithm.Sample.co >>>>>>> mputeApproximateDeltaFast(Sample.java:492) >>>>>>> at my.package.algorithm.Sample.run(Sample.java:291). >>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>> >>>>>>> While I realize these dumps on their own may not be helpful, they at >>>>>>> least (as far as I know) indicate that the threads are all waiting on >>>>>>> something. >>>>>>> But if it was resource scarcity I believe the program would >>>>>>> terminate with an exception. >>>>>>> And if it was garbage collection activity, I believe the JVM process >>>>>>> would not be at 0% CPU usage. >>>>>>> >>>>>>> *Note: *I realize I didn't provide the user-code code that >>>>>>> generates the execution plan for Flink which led to the contexts in >>>>>>> which >>>>>>> the threads are waiting, but I hope it may not be necessary. >>>>>>> My problem now is that I am unsure on how to proceed to further >>>>>>> debug this issue: >>>>>>> - The assigned memory is fully used, but there are no exceptions >>>>>>> about lack of memory. >>>>>>> - The CPU usage is at 0% and all threads are all in a waiting state, >>>>>>> but I don't understand what signal they're waiting for exactly. >>>>>>> >>>>>>> Hoping anyone might be able to give me a hint. >>>>>>> >>>>>>> Thank you very much for your time. >>>>>>> >>>>>>> Best regards, >>>>>>> >>>>>>> Miguel E. Coimbra >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >