Hi James, it is unlikely that your issue is the same as the one Miguel is having. His one https://issues.apache.org/jira/browse/FLINK-9242 is probably the same as https://issues.apache.org/jira/browse/FLINK-9144 and happens only in batch programs spilling data in Flink 1.5 and 1.6 versions before last Friday.
From the information you provided, I suppose you are running a streaming job in Flink 1.4, do you? Your example looks like a simpler setup: can you try to minimise it so that you can share the code and we can have a look? Regards Nico On 18/04/18 01:59, James Yu wrote: > Miguel, I and my colleague ran into same problem yesterday. > We were expecting Flink to get 4 inputs from Kafka and write the inputs > to Cassandra, but the operators got stuck after the 1st input is written > into Cassandra. > This is how DAG looks like: > Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink) > After we disable the auto chaining > (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#task-chaining-and-resource-groups), > all 4 inputs are read from Kafka and written into Cassandra. > We are still figuring out why the chaining causes the blocking. > > > This is a UTF-8 formatted mail > ----------------------------------------------- > James C.-C.Yu > +886988713275 > > 2018-04-18 6:57 GMT+08:00 Miguel Coimbra <miguel.e.coim...@gmail.com > <mailto:miguel.e.coim...@gmail.com>>: > > Chesnay, following your suggestions I got access to the web > interface and also took a closer look at the debugging logs. > I have noticed one problem regarding the web interface port - it > keeps changing port now and then during my Java program's execution. > > Not sure if that is due to my program launching several job > executions sequentially, but the fact is that it happened. > Since I am accessing the web interface via tunneling, it becomes > rather cumbersome to keep adapting it. > > Another particular problem I'm noticing is that this exception > frequently pops up (debugging with log4j): > > 00:17:54,368 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - > Releasing slot with slot request id 9055ef473251505dac04c99727106dc9. > org.apache.flink.util.FlinkException: Slot is being returned to the > SlotPool. > at > > org.apache.flink.runtime.jobmaster.slotpool.SlotPool$ProviderAndOwner.returnAllocatedSlot(SlotPool.java:1521) > at > > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$releaseSlot$0(SingleLogicalSlot.java:130) > at > > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > > java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834) > at > java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155) > at > > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130) > at > > org.apache.flink.runtime.executiongraph.Execution.releaseAssignedResource(Execution.java:1239) > at > > org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:946) > at > > org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1588) > at > > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:593) > at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Don't know if the internals of Flink are explicitly using an > exception for control flow, but there are several occurrences of > this as time goes by. > > Regarding my program itself, I've achieved some progress. > In my program I need to do a sequence of series of Flink jobs, and > need extra care to make sure no DataSet instance from job /i/ is > being used in an operator in job /i + 1/. > I believe this was generating the waiting scenarios I describe in an > earlier email. > The bottom line is to be extra careful about when job executions are > actually triggered and to make sure that a DataSet which will need > to be used in different Flink jobs is available for example as a > file in secondary storage (possibly masked as a memory-mapping) and > is exclusively read from that source. > This means ensuring the job that originally produces a DataSet (for > reuse on a later job) assigns to it a DataSink for secondary storage. > > I'm going to keep digging taking this in account - if will report > back if I manage to fix everything or find a new problem. > > Thanks again, > > > > Miguel E. Coimbra > Email: miguel.e.coim...@gmail.com <mailto:miguel.e.coim...@ist.utl.pt> > > On 16 April 2018 at 10:26, Chesnay Schepler <ches...@apache.org > <mailto:ches...@apache.org>> wrote: > > ah yes, currently when you use that method the UI is started on > a random port. I'm currently fixing that in this PR > <https://github.com/apache/flink/pull/5814> that will be merged > today. For now you will enable logging and search for something > along the lines of "http://<host>:<port> was granted leadership" > > Sorry for the inconvenience. > > On 16.04.2018 15:04, Miguel Coimbra wrote: >> Thanks for the suggestions Chesnay, I will try them out. >> >> However, I have already tried your suggestion with the >> dependency flink-runtime-web and nothing happened. >> If I understood you correctly, adding that dependency in the >> pom.xml would make it so the web front-end is running when I >> call the following line? >> >> LocalEnvironment lenv = (LocalEnvironment) >> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >> >> I added flink-runtime-web in my pom.xml, recompiled and >> launched the program but I simply got "Unable to connect" in >> my browser (Firefox) on localhost:8081. >> Performing wget on localhost:8081 resulted in this: >> >> $ wget localhost:8081 >> --2018-04-16 12:47:26-- http://localhost:8081/ >> Resolving localhost (localhost)... ::1, 127.0.0.1 >> Connecting to localhost (localhost)|::1|:8081... failed: >> Connection refused. >> Connecting to localhost (localhost)|127.0.0.1|:8081... failed: >> Connection refused. >> >> It seems something was bound to localhost:8081 but the >> connection is not working for some reason. >> I probably am skipping some important detail. >> These are some of my dependencies: >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-java</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-core</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-gelly_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> >> >> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <!-- >> https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web >> >> <https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web> >> --> >> *<dependency> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> </dependency>** >> * >> >> Have you managed to get the web front-end in local mode? >> >> >> Best regards, >> >> Miguel E. Coimbra >> Email: miguel.e.coim...@gmail.com >> <mailto:miguel.e.coim...@ist.utl.pt> >> >> On 16 April 2018 at 05:12, Chesnay Schepler >> <ches...@apache.org <mailto:ches...@apache.org>> wrote: >> >> The thing with createLocalEnvironmentWithWebUI is that it >> requires flink-runtime-web to be on the classpath, which >> is rarely the class when running things in the IDE. >> It should work fine in the IDE if you add it as a >> dependency to your project. This should've been logged as >> a warning. >> >> Chaining is unrelated to this issue as join operators are >> never chained to one another. >> Lambda functions are also not the issue, if they were the >> job would fail much earlier. >> >> It is reasonable that T3 is blocked if T1 is blocked. T1 >> gets no input hence produces no output, which now also >> blocks T3. >> >> There are multiple possible explanations i can come up with: >> * the preceding operators are blocked on something or >> /really /slow >> * the preceding operators are actually finished, but >> aren't shutting down due to an implementation error >> * a deadlock in Flink's join logic >> * a deadlock in Flink's network stack >> >> For the first 2 we will have to consult the UI or logs. >> You said you were dumping the input DataSets into files, >> but were they actually complete? >> >> A deadlock in the network stack should appear as all >> existing operator threads being blocked. >> We can probably rule out a problem with the join logic by >> removing the second join and trying again. >> >> >> >> On 16.04.2018 03:10, Miguel Coimbra wrote: >>> Hello, >>> >>> It would seem that the function which is supposed to >>> launch local mode with the web front-end doesn't launch >>> the front-end at all... >>> This function seems not to be doing what it is supposed >>> to do, if I'm not mistaken: >>> >>> LocalEnvironment lenv = (LocalEnvironment) >>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >>> >>> Regarding the preceding operators, the thread dumps I got >>> were pointing to a specific set of operations over >>> DataSet instances that were passed into my function. >>> Below I show the code segment and put the lines where >>> threads are waiting in *bold*: >>> >>> public static <K, VV, EV> DataSet<Edge<K, EV>> >>> selectEdges(final Graph<K, VV, EV> originalGraph, final >>> DataSet<Vertex<K, VV>> vertices) { >>> return vertices >>> .joinWithHuge(originalGraph.getEdges()) >>> .where(0).equalTo(0) >>> * .with((source, edge) -> edge)* *// Thread 1 >>> is blocked here* >>> .returns(originalGraph.getEdges().getType()) >>> .join(vertices) >>> .where(1).equalTo(0) >>> * .with((e, v) -> e) // Thread 3 is blocked here* >>> .returns(originalGraph.getEdges().getType()) >>> .distinct(0, 1); >>> } >>> >>> Note: the edges inside the graph originalGraph edge >>> DataSet are much greater in number than the elements of >>> the vertices DataSet, so I believe that function is being >>> used correctly. >>> >>> I will try testing with remote (cluster) mode to have >>> access to the web front-end, but I have some questions >>> for now: >>> >>> - The fact that they are blocked in different >>> JoinOperator instances that are chained, is this a result >>> of Flink's default pipeline mechanism? >>> - Could there be a problem stemming from the fact they >>> are both waiting on lambdas? >>> - I have tried dumping both DataSet variables >>> originalGraph and vertices into files (the ones being >>> used in this code), and they produced correct values >>> (non-empty files), so I don't have a clue what the >>> threads inside Flink's runtime are waiting on. >>> >>> Thanks for the help so far Chesnay. >>> >>> >>> Miguel E. Coimbra >>> Email: miguel.e.coim...@gmail.com >>> <mailto:miguel.e.coim...@ist.utl.pt> >>> >>> ---------- Forwarded message ---------- >>> >>> From: Chesnay Schepler <ches...@apache.org >>> <mailto:ches...@apache.org>> >>> To: user@flink.apache.org <mailto:user@flink.apache.org> >>> Cc: >>> Bcc: >>> Date: Sun, 15 Apr 2018 18:54:33 +0200 >>> Subject: Re: Unsure how to further debug - operator >>> threads stuck on java.lang.Thread.State: WAITING >>> Hello, >>> >>> Thread #1-3 are waiting for input, Thread #4 is >>> waiting for the job to finish. >>> >>> To further debug this I would look into what the >>> preceding operators are doing, whether they are >>> blocked on something or are emitting records (which >>> you can check in the UI/metrics). >>> >>> On 15.04.2018 18:40, Miguel Coimbra wrote: >>>> Hello, >>>> >>>> I am running into a situation where the Flink >>>> threads responsible for my operator execution are >>>> all stuck on WAITING mode. >>>> Before anything else, this is my machine's spec: >>>> >>>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- >>>> 4830 @ 2.13GHz GenuineIntel GNU/Linux >>>> 256 GB RAM >>>> >>>> I am running in local mode on a machine with a >>>> considerable amount of memory, so perhaps that may >>>> be triggering some execution edge-case? >>>> >>>> Moving on, this is my Java: >>>> >>>> openjdk version "1.8.0_151" >>>> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware) >>>> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode) >>>> >>>> Getting back to the problem: I am currently using >>>> Flink 1.5-SNAPSHOT with LocalEnvironment on this >>>> large-memory machine, with parallelism set to one: >>>> >>>> Configuration conf = new Configuration(); >>>> LocalEnvironment lenv = (LocalEnvironment) >>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); >>>> ExecutionEnvironment env = lenv; >>>> >>>> env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse(); >>>> env.setParallelism(1); >>>> >>>> This initializes the execution environment for a >>>> series of sequential jobs (any data dependency >>>> between jobs is flushed to disk on job /i /and read >>>> back from disk into a DataSet in job /i + 1/). >>>> To reiterate, I am not launching a Flink cluster, I >>>> am just executing in local mode from a code base >>>> compiled with Maven. >>>> >>>> I have tested this program via mvn exec:exec with >>>> different values of memory (from -Xmx20000m to >>>> -Xmx120000m, from 20GB to 120GB) and the result is >>>> always the same: the process' memory fills up >>>> completely and then the process' CPU usage drops to 0%. >>>> This is strange because if it was lack of memory, I >>>> would expect an OutOfMemoryError. >>>> >>>> I have debugged with IntelliJ IDEA and obtained >>>> thread dumps from different executions, and realized >>>> quite a few operator threads are stuck on >>>> java.lang.Thread.State: WAITING. >>>> >>>> There are four major threads that I find to be in >>>> this waiting state. >>>> The thread dumps I obtained show me where the wait >>>> calls originated: >>>> >>>> *Number 1: >>>> >>>> *"CHAIN Join (Join at >>>> selectEdges(GraphUtils.java:328)) -> Combine >>>> (Distinct at selectEdges(GraphUtils.java:330)) >>>> (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting >>>> java.lang.Thread.State: WAITING >>>> at java.lang.Object.wait(Object.java:-1) >>>> at java.lang.Object.wait(Object.java:502) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) >>>> at >>>> >>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>>> at >>>> >>>> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.MutableHashTable.pro >>>> >>>> <http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122) >>>> at >>>> >>>> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221) >>>> at >>>> >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) >>>> at >>>> >>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) >>>> at >>>> >>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> >>>> *Number 2:* >>>> >>>> "Join (Join at >>>> summaryGraph(SummaryGraphBuilder.java:92)) >>>> (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting >>>> java.lang.Thread.State: WAITING >>>> at java.lang.Object.wait(Object.java:-1) >>>> at java.lang.Object.wait(Object.java:502) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) >>>> at >>>> >>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>>> at >>>> >>>> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.MutableHashTable.pro >>>> >>>> <http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122) >>>> at >>>> >>>> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221) >>>> at >>>> >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) >>>> at >>>> >>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) >>>> at >>>> >>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> *Number 3:* >>>> >>>> "Join (Join at selectEdges(GraphUtils.java:324)) >>>> (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting >>>> java.lang.Thread.State: WAITING >>>> at java.lang.Object.wait(Object.java:-1) >>>> at java.lang.Object.wait(Object.java:502) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) >>>> at org.apache.flink.runtime.io >>>> >>>> <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) >>>> at >>>> >>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>>> at >>>> >>>> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.MutableHashTable.pro >>>> >>>> <http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) >>>> at >>>> >>>> org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123) >>>> at >>>> >>>> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221) >>>> at >>>> >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503) >>>> at >>>> >>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) >>>> at >>>> >>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> *Number 4:* >>>> >>>> "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA >>>> waiting >>>> java.lang.Thread.State: WAITING >>>> at sun.misc.Unsafe.park(Unsafe.java:-1) >>>> at >>>> >>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) >>>> at >>>> >>>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) >>>> at >>>> >>>> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) >>>> at >>>> >>>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) >>>> at >>>> >>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) >>>> at >>>> >>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519) >>>> at >>>> >>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231) >>>> - locked <0x23eb> (a java.lang.Object) >>>> at >>>> >>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91) >>>> at >>>> >>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815) >>>> at >>>> org.apache.flink.api.java.DataSet.count(DataSet.java:398) >>>> at >>>> >>>> my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103) >>>> at my.package.algorithm.Sample.co >>>> >>>> <http://my.package.algorithm.Sample.co>mputeApproximateDeltaFast(Sample.java:492) >>>> at my.package.algorithm.Sample.ru >>>> <http://my.package.algorithm.Sample.ru>n(Sample.java:291). >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> While I realize these dumps on their own may not be >>>> helpful, they at least (as far as I know) indicate >>>> that the threads are all waiting on something. >>>> But if it was resource scarcity I believe the >>>> program would terminate with an exception. >>>> And if it was garbage collection activity, I believe >>>> the JVM process would not be at 0% CPU usage. >>>> >>>> *Note: *I realize I didn't provide the user-code >>>> code that generates the execution plan for Flink >>>> which led to the contexts in which the threads are >>>> waiting, but I hope it may not be necessary. >>>> My problem now is that I am unsure on how to proceed >>>> to further debug this issue: >>>> - The assigned memory is fully used, but there are >>>> no exceptions about lack of memory. >>>> - The CPU usage is at 0% and all threads are all in >>>> a waiting state, but I don't understand what signal >>>> they're waiting for exactly. >>>> >>>> Hoping anyone might be able to give me a hint. >>>> >>>> Thank you very much for your time. >>>> >>>> Best regards, >>>> >>>> Miguel E. Coimbra >>> >> >> > > > -- Nico Kruber | Software Engineer data Artisans Follow us @dataArtisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
signature.asc
Description: OpenPGP digital signature