Hello James, Thanks for the information. I noticed something suspicious as well: I have chains of operators where the first operator will ingest the expected amount of records but will not emit any, leaving the following operator empty in a "RUNNING" state. For example:
I will get back if I find out more. Best regards, Miguel E. Coimbra Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> On 17 April 2018 at 20:59, James Yu <cyu...@gmail.com> wrote: > Miguel, I and my colleague ran into same problem yesterday. > We were expecting Flink to get 4 inputs from Kafka and write the inputs to > Cassandra, but the operators got stuck after the 1st input is written into > Cassandra. > This is how DAG looks like: > Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink) > After we disable the auto chaining (https://ci.apache.org/ > projects/flink/flink-docs-release-1.4/dev/stream/ > operators/#task-chaining-and-resource-groups), all 4 inputs are read from > Kafka and written into Cassandra. > We are still figuring out why the chaining causes the blocking. > > > This is a UTF-8 formatted mail > ----------------------------------------------- > James C.-C.Yu > +886988713275 > > 2018-04-18 6:57 GMT+08:00 Miguel Coimbra <miguel.e.coim...@gmail.com>: > >> Chesnay, following your suggestions I got access to the web interface and >> also took a closer look at the debugging logs. >> I have noticed one problem regarding the web interface port - it keeps >> changing port now and then during my Java program's execution. >> >> Not sure if that is due to my program launching several job executions >> sequentially, but the fact is that it happened. >> Since I am accessing the web interface via tunneling, it becomes rather >> cumbersome to keep adapting it. >> >> Another particular problem I'm noticing is that this exception frequently >> pops up (debugging with log4j): >> >> 00:17:54,368 DEBUG org.apache.flink.runtime.jobma >> ster.slotpool.SlotPool - Releasing slot with slot request id >> 9055ef473251505dac04c99727106dc9. >> org.apache.flink.util.FlinkException: Slot is being returned to the >> SlotPool. >> at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$Provide >> rAndOwner.returnAllocatedSlot(SlotPool.java:1521) >> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo >> t.lambda$releaseSlot$0(SingleLogicalSlot.java:130) >> at java.util.concurrent.CompletableFuture.uniHandle(Completable >> Future.java:822) >> at java.util.concurrent.CompletableFuture.uniHandleStage(Comple >> tableFuture.java:834) >> at java.util.concurrent.CompletableFuture.handle(CompletableFut >> ure.java:2155) >> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo >> t.releaseSlot(SingleLogicalSlot.java:130) >> at org.apache.flink.runtime.executiongraph.Execution.releaseAss >> ignedResource(Execution.java:1239) >> at org.apache.flink.runtime.executiongraph.Execution.markFinish >> ed(Execution.java:946) >> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat >> eState(ExecutionGraph.java:1588) >> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecu >> tionState(JobMaster.java:593) >> at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo >> cation(AkkaRpcActor.java:210) >> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage >> (AkkaRpcActor.java:154) >> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleM >> essage(FencedAkkaRpcActor.java:66) >> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onRece >> ive$1(AkkaRpcActor.java:132) >> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell >> .scala:544) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j >> ava:260) >> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For >> kJoinPool.java:1339) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo >> l.java:1979) >> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW >> orkerThread.java:107) >> >> Don't know if the internals of Flink are explicitly using an exception >> for control flow, but there are several occurrences of this as time goes by. >> >> Regarding my program itself, I've achieved some progress. >> In my program I need to do a sequence of series of Flink jobs, and need >> extra care to make sure no DataSet instance from job *i* is being used >> in an operator in job *i + 1*. >> I believe this was generating the waiting scenarios I describe in an >> earlier email. >> The bottom line is to be extra careful about when job executions are >> actually triggered and to make sure that a DataSet which will need to be >> used in different Flink jobs is available for example as a file in >> secondary storage (possibly masked as a memory-mapping) and is exclusively >> read from that source. >> This means ensuring the job that originally produces a DataSet (for >> reuse on a later job) assigns to it a DataSink for secondary storage. >> >> I'm going to keep digging taking this in account - if will report back if >> I manage to fix everything or find a new problem. >> >> Thanks again, >> >> >> >> Miguel E. Coimbra >> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >> >> On 16 April 2018 at 10:26, Chesnay Schepler <ches...@apache.org> wrote: >> >>> ah yes, currently when you use that method the UI is started on a random >>> port. I'm currently fixing that in this PR >>> <https://github.com/apache/flink/pull/5814> that will be merged today. >>> For now you will enable logging and search for something along the lines of >>> "http://<host>:<port> was granted leadership" >>> >>> Sorry for the inconvenience. >>> >>> On 16.04.2018 15:04, Miguel Coimbra wrote: >>> >>> Thanks for the suggestions Chesnay, I will try them out. >>> >>> However, I have already tried your suggestion with the dependency >>> flink-runtime-web and nothing happened. >>> If I understood you correctly, adding that dependency in the pom.xml >>> would make it so the web front-end is running when I call the following >>> line? >>> >>> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc >>> alEnvironmentWithWebUI(conf); >>> >>> I added flink-runtime-web in my pom.xml, recompiled and launched the >>> program but I simply got "Unable to connect" in my browser (Firefox) on >>> localhost:8081. >>> Performing wget on localhost:8081 resulted in this: >>> >>> $ wget localhost:8081 >>> --2018-04-16 12:47:26-- http://localhost:8081/ >>> Resolving localhost (localhost)... ::1, 127.0.0.1 >>> Connecting to localhost (localhost)|::1|:8081... failed: Connection >>> refused. >>> Connecting to localhost (localhost)|127.0.0.1|:8081... failed: >>> Connection refused. >>> >>> It seems something was bound to localhost:8081 but the connection is not >>> working for some reason. >>> I probably am skipping some important detail. >>> These are some of my dependencies: >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-java</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-core</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-clients_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-gelly_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-gelly-examples_${scala.binary.version}</ar >>> tifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-streaming-java_${scala.binary.version}</ar >>> tifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-streaming-scala_${scala.binary.version}</ >>> artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-ru >>> ntime-web --> >>> >>> >>> >>> >>> *<dependency> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> >>> <version>${flink.version}</version> </dependency>* >>> >>> Have you managed to get the web front-end in local mode? >>> >>> >>> Best regards, >>> >>> Miguel E. Coimbra >>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>> >>> On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org> wrote: >>> >>>> The thing with createLocalEnvironmentWithWebUI is that it requires >>>> flink-runtime-web to be on the classpath, which is rarely the class >>>> when running things in the IDE. >>>> It should work fine in the IDE if you add it as a dependency to your >>>> project. This should've been logged as a warning. >>>> >>>> Chaining is unrelated to this issue as join operators are never chained >>>> to one another. >>>> Lambda functions are also not the issue, if they were the job would >>>> fail much earlier. >>>> >>>> It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input >>>> hence produces no output, which now also blocks T3. >>>> >>>> There are multiple possible explanations i can come up with: >>>> * the preceding operators are blocked on something or *really *slow >>>> * the preceding operators are actually finished, but aren't shutting >>>> down due to an implementation error >>>> * a deadlock in Flink's join logic >>>> * a deadlock in Flink's network stack >>>> >>>> For the first 2 we will have to consult the UI or logs. You said you >>>> were dumping the input DataSets into files, but were they actually >>>> complete? >>>> >>>> A deadlock in the network stack should appear as all existing operator >>>> threads being blocked. >>>> We can probably rule out a problem with the join logic by removing the >>>> second join and trying again. >>>> >>>> >>>> >>>> On 16.04.2018 03:10, Miguel Coimbra wrote: >>>> >>>> Hello, >>>> >>>> It would seem that the function which is supposed to launch local mode >>>> with the web front-end doesn't launch the front-end at all... >>>> This function seems not to be doing what it is supposed to do, if I'm >>>> not mistaken: >>>> >>>> LocalEnvironment lenv = (LocalEnvironment) >>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >>>> >>>> Regarding the preceding operators, the thread dumps I got were pointing >>>> to a specific set of operations over DataSet instances that were >>>> passed into my function. >>>> Below I show the code segment and put the lines where threads are >>>> waiting in *bold*: >>>> >>>> public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final >>>> Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) { >>>> return vertices >>>> .joinWithHuge(originalGraph.getEdges()) >>>> .where(0).equalTo(0) >>>> * .with((source, edge) -> edge)* *// Thread 1 is blocked >>>> here* >>>> .returns(originalGraph.getEdges().getType()) >>>> .join(vertices) >>>> .where(1).equalTo(0) >>>> * .with((e, v) -> e) // Thread 3 is blocked here* >>>> .returns(originalGraph.getEdges().getType()) >>>> .distinct(0, 1); >>>> } >>>> >>>> Note: the edges inside the graph originalGraph edge DataSet are much >>>> greater in number than the elements of the vertices DataSet, so I >>>> believe that function is being used correctly. >>>> >>>> I will try testing with remote (cluster) mode to have access to the web >>>> front-end, but I have some questions for now: >>>> >>>> - The fact that they are blocked in different JoinOperator instances >>>> that are chained, is this a result of Flink's default pipeline mechanism? >>>> - Could there be a problem stemming from the fact they are both waiting >>>> on lambdas? >>>> - I have tried dumping both DataSet variables originalGraph and vertices >>>> into files (the ones being used in this code), and they produced >>>> correct values (non-empty files), so I don't have a clue what the threads >>>> inside Flink's runtime are waiting on. >>>> >>>> Thanks for the help so far Chesnay. >>>> >>>> >>>> Miguel E. Coimbra >>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt> >>>> >>>> ---------- Forwarded message ---------- >>>> >>>>> From: Chesnay Schepler <ches...@apache.org> >>>>> To: user@flink.apache.org >>>>> Cc: >>>>> Bcc: >>>>> Date: Sun, 15 Apr 2018 18:54:33 +0200 >>>>> Subject: Re: Unsure how to further debug - operator threads stuck on >>>>> java.lang.Thread.State: WAITING >>>>> Hello, >>>>> >>>>> Thread #1-3 are waiting for input, Thread #4 is waiting for the job to >>>>> finish. >>>>> >>>>> To further debug this I would look into what the preceding operators >>>>> are doing, whether they are blocked on something or are emitting records >>>>> (which you can check in the UI/metrics). >>>>> >>>>> On 15.04.2018 18:40, Miguel Coimbra wrote: >>>>> >>>>> Hello, >>>>> >>>>> I am running into a situation where the Flink threads responsible for >>>>> my operator execution are all stuck on WAITING mode. >>>>> Before anything else, this is my machine's spec: >>>>> >>>>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830 @ 2.13GHz >>>>> GenuineIntel GNU/Linux >>>>> 256 GB RAM >>>>> >>>>> I am running in local mode on a machine with a considerable amount of >>>>> memory, so perhaps that may be triggering some execution edge-case? >>>>> >>>>> Moving on, this is my Java: >>>>> >>>>> openjdk version "1.8.0_151" >>>>> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware) >>>>> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode) >>>>> >>>>> Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT >>>>> with LocalEnvironment on this large-memory machine, with parallelism >>>>> set to one: >>>>> >>>>> Configuration conf = new Configuration(); >>>>> LocalEnvironment lenv = (LocalEnvironment) >>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >>>>> ExecutionEnvironment env = lenv; >>>>> env.getConfig().enableSysoutLogging().enableClosureCleaner() >>>>> .enableObjectReuse(); >>>>> env.setParallelism(1); >>>>> >>>>> This initializes the execution environment for a series of sequential >>>>> jobs (any data dependency between jobs is flushed to disk on job *i *and >>>>> read back from disk into a DataSet in job *i + 1*). >>>>> To reiterate, I am not launching a Flink cluster, I am just executing >>>>> in local mode from a code base compiled with Maven. >>>>> >>>>> I have tested this program via mvn exec:exec with different values of >>>>> memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the >>>>> result is always the same: the process' memory fills up completely and >>>>> then >>>>> the process' CPU usage drops to 0%. >>>>> This is strange because if it was lack of memory, I would expect an >>>>> OutOfMemoryError. >>>>> >>>>> I have debugged with IntelliJ IDEA and obtained thread dumps from >>>>> different executions, and realized quite a few operator threads are stuck >>>>> on java.lang.Thread.State: WAITING. >>>>> >>>>> There are four major threads that I find to be in this waiting state. >>>>> The thread dumps I obtained show me where the wait calls originated: >>>>> >>>>> >>>>> >>>>> *Number 1: *"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> >>>>> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" >>>>> prio=5 tid=0xd93 nid=NA waiting >>>>> java.lang.Thread.State: WAITING >>>>> at java.lang.Object.wait(Object.java:-1) >>>>> at java.lang.Object.wait(Object.java:502) >>>>> at org.apache.flink.runtime.io.network.partition.consumer.Singl >>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522) >>>>> at org.apache.flink.runtime.io.network.partition.consumer.Singl >>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491) >>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>> dReader.getNextRecord(AbstractRecordReader.java:86) >>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>> Reader.next(MutableRecordReader.java:47) >>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>> ReaderIterator.java:59) >>>>> at org.apache.flink.runtime.operators.util.metrics.CountingMuta >>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36) >>>>> at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro >>>>> beIterator.next(MutableHashTable.java:1929) >>>>> at org.apache.flink.runtime.operators.hash.MutableHashTable.pro >>>>> cessProbeIter(MutableHashTable.java:505) >>>>> at org.apache.flink.runtime.operators.hash.MutableHashTable.nex >>>>> tRecord(MutableHashTable.java:666) >>>>> at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa >>>>> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte >>>>> rator.java:122) >>>>> at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver >>>>> .java:221) >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>> ava:503) >>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>>> k.java:368) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> >>>>> *Number 2:* >>>>> >>>>> "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" >>>>> prio=5 tid=0xd8e nid=NA waiting >>>>> java.lang.Thread.State: WAITING >>>>> at java.lang.Object.wait(Object.java:-1) >>>>> at java.lang.Object.wait(Object.java:502) >>>>> at org.apache.flink.runtime.io.network.partition.consumer.Singl >>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522) >>>>> at org.apache.flink.runtime.io.network.partition.consumer.Singl >>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491) >>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>> dReader.getNextRecord(AbstractRecordReader.java:86) >>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>> Reader.next(MutableRecordReader.java:47) >>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>> ReaderIterator.java:59) >>>>> at org.apache.flink.runtime.operators.util.metrics.CountingMuta >>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36) >>>>> at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro >>>>> beIterator.next(MutableHashTable.java:1929) >>>>> at org.apache.flink.runtime.operators.hash.MutableHashTable.pro >>>>> cessProbeIter(MutableHashTable.java:505) >>>>> at org.apache.flink.runtime.operators.hash.MutableHashTable.nex >>>>> tRecord(MutableHashTable.java:666) >>>>> at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa >>>>> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte >>>>> rator.java:122) >>>>> at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver >>>>> .java:221) >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>> ava:503) >>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>>> k.java:368) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> *Number 3:* >>>>> >>>>> "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 >>>>> tid=0xd75 nid=NA waiting >>>>> java.lang.Thread.State: WAITING >>>>> at java.lang.Object.wait(Object.java:-1) >>>>> at java.lang.Object.wait(Object.java:502) >>>>> at org.apache.flink.runtime.io.network.partition.consumer.Singl >>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522) >>>>> at org.apache.flink.runtime.io.network.partition.consumer.Singl >>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491) >>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>> dReader.getNextRecord(AbstractRecordReader.java:86) >>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>> Reader.next(MutableRecordReader.java:47) >>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>> ReaderIterator.java:59) >>>>> at org.apache.flink.runtime.operators.util.metrics.CountingMuta >>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36) >>>>> at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro >>>>> beIterator.next(MutableHashTable.java:1929) >>>>> at org.apache.flink.runtime.operators.hash.MutableHashTable.pro >>>>> cessProbeIter(MutableHashTable.java:505) >>>>> at org.apache.flink.runtime.operators.hash.MutableHashTable.nex >>>>> tRecord(MutableHashTable.java:666) >>>>> at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHas >>>>> hJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinItera >>>>> tor.java:123) >>>>> at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver >>>>> .java:221) >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>> ava:503) >>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>>> k.java:368) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> *Number 4:* >>>>> >>>>> "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting >>>>> java.lang.Thread.State: WAITING >>>>> at sun.misc.Unsafe.park(Unsafe.java:-1) >>>>> at java.util.concurrent.locks.LockSupport.park(LockSupport.java >>>>> :175) >>>>> at java.util.concurrent.CompletableFuture$Signaller.block(Compl >>>>> etableFuture.java:1693) >>>>> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool. >>>>> java:3323) >>>>> at java.util.concurrent.CompletableFuture.waitingGet(Completabl >>>>> eFuture.java:1729) >>>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture >>>>> .java:1895) >>>>> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobB >>>>> locking(MiniCluster.java:519) >>>>> at org.apache.flink.client.LocalExecutor.executePlan(LocalExecu >>>>> tor.java:231) >>>>> - locked <0x23eb> (a java.lang.Object) >>>>> at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvi >>>>> ronment.java:91) >>>>> at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu >>>>> tionEnvironment.java:815) >>>>> at org.apache.flink.api.java.DataSet.count(DataSet.java:398) >>>>> at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(M >>>>> isc.java:103) >>>>> at my.package.algorithm.Sample.computeApproximateDeltaFast(Samp >>>>> le.java:492) >>>>> at my.package.algorithm.Sample.run(Sample.java:291). >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> While I realize these dumps on their own may not be helpful, they at >>>>> least (as far as I know) indicate that the threads are all waiting on >>>>> something. >>>>> But if it was resource scarcity I believe the program would terminate >>>>> with an exception. >>>>> And if it was garbage collection activity, I believe the JVM process >>>>> would not be at 0% CPU usage. >>>>> >>>>> *Note: *I realize I didn't provide the user-code code that generates >>>>> the execution plan for Flink which led to the contexts in which the >>>>> threads >>>>> are waiting, but I hope it may not be necessary. >>>>> My problem now is that I am unsure on how to proceed to further debug >>>>> this issue: >>>>> - The assigned memory is fully used, but there are no exceptions about >>>>> lack of memory. >>>>> - The CPU usage is at 0% and all threads are all in a waiting state, >>>>> but I don't understand what signal they're waiting for exactly. >>>>> >>>>> Hoping anyone might be able to give me a hint. >>>>> >>>>> Thank you very much for your time. >>>>> >>>>> Best regards, >>>>> >>>>> Miguel E. Coimbra >>>>> >>>>> >>>> >>> >>> >> >