Hello James,

Thanks for the information.
I noticed something suspicious as well: I have chains of operators where
the first operator will ingest the expected amount of records but will not
emit any, leaving the following operator empty in a "RUNNING" state.
For example:



I will get back if I find out more.


Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>

On 17 April 2018 at 20:59, James Yu <cyu...@gmail.com> wrote:

> Miguel, I and my colleague ran into same problem yesterday.
> We were expecting Flink to get 4 inputs from Kafka and write the inputs to
> Cassandra, but the operators got stuck after the 1st input is written into
> Cassandra.
> This is how DAG looks like:
> Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
> After we disable the auto chaining (https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/stream/
> operators/#task-chaining-and-resource-groups), all 4 inputs are read from
> Kafka and written into Cassandra.
> We are still figuring out why the chaining causes the blocking.
>
>
> This is a UTF-8 formatted mail
> -----------------------------------------------
> James C.-C.Yu
> +886988713275
>
> 2018-04-18 6:57 GMT+08:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
>
>> Chesnay, following your suggestions I got access to the web interface and
>> also took a closer look at the debugging logs.
>> I have noticed one problem regarding the web interface port - it keeps
>> changing port now and then during my Java program's execution.
>>
>> Not sure if that is due to my program launching several job executions
>> sequentially, but the fact is that it happened.
>> Since I am accessing the web interface via tunneling, it becomes rather
>> cumbersome to keep adapting it.
>>
>> Another particular problem I'm noticing is that this exception frequently
>> pops up (debugging with log4j):
>>
>> 00:17:54,368 DEBUG org.apache.flink.runtime.jobma
>> ster.slotpool.SlotPool          - Releasing slot with slot request id
>> 9055ef473251505dac04c99727106dc9.
>> org.apache.flink.util.FlinkException: Slot is being returned to the
>> SlotPool.
>>         at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$Provide
>> rAndOwner.returnAllocatedSlot(SlotPool.java:1521)
>>         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>> t.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
>>         at java.util.concurrent.CompletableFuture.uniHandle(Completable
>> Future.java:822)
>>         at java.util.concurrent.CompletableFuture.uniHandleStage(Comple
>> tableFuture.java:834)
>>         at java.util.concurrent.CompletableFuture.handle(CompletableFut
>> ure.java:2155)
>>         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
>> t.releaseSlot(SingleLogicalSlot.java:130)
>>         at org.apache.flink.runtime.executiongraph.Execution.releaseAss
>> ignedResource(Execution.java:1239)
>>         at org.apache.flink.runtime.executiongraph.Execution.markFinish
>> ed(Execution.java:946)
>>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>> eState(ExecutionGraph.java:1588)
>>         at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecu
>> tionState(JobMaster.java:593)
>>         at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>> cation(AkkaRpcActor.java:210)
>>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage
>> (AkkaRpcActor.java:154)
>>         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleM
>> essage(FencedAkkaRpcActor.java:66)
>>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onRece
>> ive$1(AkkaRpcActor.java:132)
>>         at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell
>> .scala:544)
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j
>> ava:260)
>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For
>> kJoinPool.java:1339)
>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>>
>> Don't know if the internals of Flink are explicitly using an exception
>> for control flow, but there are several occurrences of this as time goes by.
>>
>> Regarding my program itself, I've achieved some progress.
>> In my program I need to do a sequence of series of Flink jobs, and need
>> extra care to make sure no DataSet instance from job *i* is being used
>> in an operator in job *i + 1*.
>> I believe this was generating the waiting scenarios I describe in an
>> earlier email.
>> The bottom line is to be extra careful about when job executions are
>> actually triggered and to make sure that a DataSet which will need to be
>> used in different Flink jobs is available for example as a file in
>> secondary storage (possibly masked as a memory-mapping) and is exclusively
>> read from that source.
>> This means ensuring the job that originally produces a DataSet (for
>> reuse on a later job) assigns to it a DataSink for secondary storage.
>>
>> I'm going to keep digging taking this in account - if will report back if
>> I manage to fix everything or find a new problem.
>>
>> Thanks again,
>>
>>
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>
>> On 16 April 2018 at 10:26, Chesnay Schepler <ches...@apache.org> wrote:
>>
>>> ah yes, currently when you use that method the UI is started on a random
>>> port. I'm currently fixing that in this PR
>>> <https://github.com/apache/flink/pull/5814> that will be merged today.
>>> For now you will enable logging and search for something along the lines of
>>> "http://<host>:<port> was granted leadership"
>>>
>>> Sorry for the inconvenience.
>>>
>>> On 16.04.2018 15:04, Miguel Coimbra wrote:
>>>
>>> Thanks for the suggestions Chesnay, I will try them out.
>>>
>>> However, I have already tried your suggestion with the dependency
>>> flink-runtime-web and nothing happened.
>>> If I understood you correctly, adding that dependency in the pom.xml
>>> would make it so the web front-end is running when I call the following
>>> line?
>>>
>>> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
>>> alEnvironmentWithWebUI(conf);
>>>
>>> I added flink-runtime-web  in my pom.xml, recompiled and launched the
>>> program but I simply got "Unable to connect" in my browser (Firefox) on
>>> localhost:8081.
>>> Performing wget on localhost:8081 resulted in this:
>>>
>>> $ wget localhost:8081
>>> --2018-04-16 12:47:26--  http://localhost:8081/
>>> Resolving localhost (localhost)... ::1, 127.0.0.1
>>> Connecting to localhost (localhost)|::1|:8081... failed: Connection
>>> refused.
>>> Connecting to localhost (localhost)|127.0.0.1|:8081... failed:
>>> Connection refused.
>>>
>>> It seems something was bound to localhost:8081 but the connection is not
>>> working for some reason.
>>> I probably am skipping some important detail.
>>> These are some of my dependencies:
>>>
>>> <dependency>
>>>     <groupId>org.apache.flink</groupId>
>>>     <artifactId>flink-java</artifactId>
>>>     <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>>     <groupId>org.apache.flink</groupId>
>>>     <artifactId>flink-core</artifactId>
>>>     <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>>     <groupId>org.apache.flink</groupId>
>>>     <artifactId>flink-clients_${scala.binary.version}</artifactId>
>>>     <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>>     <groupId>org.apache.flink</groupId>
>>>     <artifactId>flink-gelly_${scala.binary.version}</artifactId>
>>>     <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>>     <groupId>org.apache.flink</groupId>
>>>     <artifactId>flink-gelly-examples_${scala.binary.version}</ar
>>> tifactId>
>>>     <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>>     <groupId>org.apache.flink</groupId>
>>>     <artifactId>flink-streaming-java_${scala.binary.version}</ar
>>> tifactId>
>>>     <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>>      <groupId>org.apache.flink</groupId>
>>>      <artifactId>flink-streaming-scala_${scala.binary.version}</
>>> artifactId>
>>>      <version>${flink.version}</version>
>>> </dependency>
>>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-ru
>>> ntime-web -->
>>>
>>>
>>>
>>>
>>> *<dependency>      <groupId>org.apache.flink</groupId>
>>>  <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
>>>  <version>${flink.version}</version> </dependency>*
>>>
>>> Have you managed to get the web front-end in local mode?
>>>
>>>
>>> Best regards,
>>>
>>> Miguel E. Coimbra
>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>
>>> On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org> wrote:
>>>
>>>> The thing with createLocalEnvironmentWithWebUI is that it requires
>>>> flink-runtime-web to be on the classpath, which is rarely the class
>>>> when running things in the IDE.
>>>> It should work fine in the IDE if you add it as a dependency to your
>>>> project. This should've been logged as a warning.
>>>>
>>>> Chaining is unrelated to this issue as join operators are never chained
>>>> to one another.
>>>> Lambda functions are also not the issue, if they were the job would
>>>> fail much earlier.
>>>>
>>>> It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input
>>>> hence produces no output, which now also blocks T3.
>>>>
>>>> There are multiple possible explanations i can come up with:
>>>> * the preceding operators are blocked on something or *really *slow
>>>> * the preceding operators are actually finished, but aren't shutting
>>>> down due to an implementation error
>>>> * a deadlock in Flink's join logic
>>>> * a deadlock in Flink's network stack
>>>>
>>>> For the first 2 we will have to consult the UI or logs. You said you
>>>> were dumping the input DataSets into files, but were they actually 
>>>> complete?
>>>>
>>>> A deadlock in the network stack should appear as all existing operator
>>>> threads being blocked.
>>>> We can probably rule out a problem with the join logic by removing the
>>>> second join and trying again.
>>>>
>>>>
>>>>
>>>> On 16.04.2018 03:10, Miguel Coimbra wrote:
>>>>
>>>> Hello,
>>>>
>>>> It would seem that the function which is supposed to launch local mode
>>>> with the web front-end doesn't launch the front-end at all...
>>>> This function seems not to be doing what it is supposed to do, if I'm
>>>> not mistaken:
>>>>
>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>
>>>> Regarding the preceding operators, the thread dumps I got were pointing
>>>> to a specific set of operations over DataSet instances that were
>>>> passed into my function.
>>>> Below I show the code segment and put the lines where threads are
>>>> waiting in *bold*:
>>>>
>>>> public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final
>>>> Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
>>>>     return vertices
>>>>             .joinWithHuge(originalGraph.getEdges())
>>>>             .where(0).equalTo(0)
>>>> *            .with((source, edge) -> edge)* *// Thread 1 is blocked
>>>> here*
>>>>             .returns(originalGraph.getEdges().getType())
>>>>             .join(vertices)
>>>>             .where(1).equalTo(0)
>>>> *            .with((e, v) -> e) // Thread 3 is blocked here*
>>>>             .returns(originalGraph.getEdges().getType())
>>>>             .distinct(0, 1);
>>>> }
>>>>
>>>> Note: the edges inside the graph originalGraph edge DataSet are much
>>>> greater in number than the elements of the vertices DataSet, so I
>>>> believe that function is being used correctly.
>>>>
>>>> I will try testing with remote (cluster) mode to have access to the web
>>>> front-end, but I have some questions for now:
>>>>
>>>> - The fact that they are blocked in different ​JoinOperator instances
>>>> that are chained, is this a result of Flink's default pipeline mechanism?
>>>> - Could there be a problem stemming from the fact they are both waiting
>>>> on lambdas?
>>>> - I have tried dumping both DataSet variables originalGraph and vertices
>>>> into files (the ones being used in this code), and they produced
>>>> correct values (non-empty files), so I don't have a clue what the threads
>>>> inside Flink's runtime are waiting on.
>>>>
>>>> ​Thanks for the help so far Chesnay.​
>>>>
>>>>
>>>> Miguel E. Coimbra
>>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>>
>>>> ---------- Forwarded message ----------
>>>>
>>>>> From: Chesnay Schepler <ches...@apache.org>
>>>>> To: user@flink.apache.org
>>>>> Cc:
>>>>> Bcc:
>>>>> Date: Sun, 15 Apr 2018 18:54:33 +0200
>>>>> Subject: Re: Unsure how to further debug - operator threads stuck on
>>>>> java.lang.Thread.State: WAITING
>>>>> Hello,
>>>>>
>>>>> Thread #1-3 are waiting for input, Thread #4 is waiting for the job to
>>>>> finish.
>>>>>
>>>>> To further debug this I would look into what the preceding operators
>>>>> are doing, whether they are blocked on something or are emitting records
>>>>> (which you can check in the UI/metrics).
>>>>>
>>>>> On 15.04.2018 18:40, Miguel Coimbra wrote:
>>>>>
>>>>> ​Hello,
>>>>>
>>>>> I am running into a situation where the Flink threads responsible for
>>>>> my operator execution are all stuck on WAITING mode.
>>>>> Before anything else, this is my machine's spec:
>>>>>
>>>>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
>>>>> GenuineIntel GNU/Linux
>>>>> 256 GB RAM
>>>>>
>>>>> I am running in local mode on a machine with a considerable amount of
>>>>> memory, so perhaps that may be triggering some execution edge-case?
>>>>>
>>>>> Moving on, this is my Java:
>>>>>
>>>>> openjdk version "1.8.0_151"
>>>>> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
>>>>> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>>>>>
>>>>> Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT
>>>>> with LocalEnvironment on this large-memory machine, with parallelism
>>>>> set to one:
>>>>>
>>>>> Configuration conf = new Configuration();
>>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>>> ExecutionEnvironment env = lenv;
>>>>> env.getConfig().enableSysoutLogging().enableClosureCleaner()
>>>>> .enableObjectReuse();
>>>>> env.setParallelism(1);
>>>>>
>>>>> This initializes the execution environment for a series of sequential
>>>>> jobs (any data dependency between jobs is flushed to disk on job *i *and
>>>>> read back from disk into a DataSet in job *i + 1*).
>>>>> To reiterate, I am not launching a Flink cluster, I am just executing
>>>>> in local mode from a code base compiled with Maven.
>>>>>
>>>>> I have tested this program via mvn exec:exec with different values of
>>>>> memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the
>>>>> result is always the same: the process' memory fills up completely and 
>>>>> then
>>>>> the process' CPU usage drops to 0%.
>>>>> This is strange because if it was lack of memory, I would expect an
>>>>> OutOfMemoryError.
>>>>>
>>>>> I have debugged with IntelliJ IDEA and obtained thread dumps from
>>>>> different executions, and realized quite a few operator threads are stuck
>>>>> on java.lang.Thread.State: WAITING.
>>>>>
>>>>> There are four major threads that I find to be in this waiting state.
>>>>> The thread dumps I obtained show me where the wait calls originated:
>>>>>
>>>>>
>>>>>
>>>>> *Number 1: *"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) ->
>>>>> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158"
>>>>> prio=5 tid=0xd93 nid=NA waiting
>>>>>   java.lang.Thread.State: WAITING
>>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>>       at java.lang.Object.wait(Object.java:502)
>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>>>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>> Reader.next(MutableRecordReader.java:47)
>>>>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>> ReaderIterator.java:59)
>>>>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>>>>> beIterator.next(MutableHashTable.java:1929)
>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>> cessProbeIter(MutableHashTable.java:505)
>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>>> tRecord(MutableHashTable.java:666)
>>>>>       at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa
>>>>> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte
>>>>> rator.java:122)
>>>>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>>>>> .java:221)
>>>>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>> ava:503)
>>>>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>> k.java:368)
>>>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>>
>>>>> *Number 2:*
>>>>>
>>>>> "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153"
>>>>> prio=5 tid=0xd8e nid=NA waiting
>>>>>   java.lang.Thread.State: WAITING
>>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>>       at java.lang.Object.wait(Object.java:502)
>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>>>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>> Reader.next(MutableRecordReader.java:47)
>>>>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>> ReaderIterator.java:59)
>>>>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>>>>> beIterator.next(MutableHashTable.java:1929)
>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>> cessProbeIter(MutableHashTable.java:505)
>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>>> tRecord(MutableHashTable.java:666)
>>>>>       at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa
>>>>> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte
>>>>> rator.java:122)
>>>>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>>>>> .java:221)
>>>>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>> ava:503)
>>>>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>> k.java:368)
>>>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> *Number 3:*
>>>>>
>>>>> "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5
>>>>> tid=0xd75 nid=NA waiting
>>>>>   java.lang.Thread.State: WAITING
>>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>>       at java.lang.Object.wait(Object.java:502)
>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>>>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>> Reader.next(MutableRecordReader.java:47)
>>>>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>> ReaderIterator.java:59)
>>>>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>>>>> beIterator.next(MutableHashTable.java:1929)
>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>>> cessProbeIter(MutableHashTable.java:505)
>>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>>> tRecord(MutableHashTable.java:666)
>>>>>       at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHas
>>>>> hJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinItera
>>>>> tor.java:123)
>>>>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>>>>> .java:221)
>>>>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>> ava:503)
>>>>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>> k.java:368)
>>>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> *Number 4:*
>>>>>
>>>>> "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
>>>>>   java.lang.Thread.State: WAITING
>>>>>       at sun.misc.Unsafe.park(Unsafe.java:-1)
>>>>>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java
>>>>> :175)
>>>>>       at java.util.concurrent.CompletableFuture$Signaller.block(Compl
>>>>> etableFuture.java:1693)
>>>>>       at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.
>>>>> java:3323)
>>>>>       at java.util.concurrent.CompletableFuture.waitingGet(Completabl
>>>>> eFuture.java:1729)
>>>>>       at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>>>> .java:1895)
>>>>>       at org.apache.flink.runtime.minicluster.MiniCluster.executeJobB
>>>>> locking(MiniCluster.java:519)
>>>>>       at org.apache.flink.client.LocalExecutor.executePlan(LocalExecu
>>>>> tor.java:231)
>>>>>       - locked <0x23eb> (a java.lang.Object)
>>>>>       at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvi
>>>>> ronment.java:91)
>>>>>       at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu
>>>>> tionEnvironment.java:815)
>>>>>       at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>>>>>       at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(M
>>>>> isc.java:103)
>>>>>       at my.package.algorithm.Sample.computeApproximateDeltaFast(Samp
>>>>> le.java:492)
>>>>>       at my.package.algorithm.Sample.run(Sample.java:291).
>>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> While I realize these dumps on their own may not be helpful, they at
>>>>> least (as far as I know) indicate that the threads are all waiting on
>>>>> something.
>>>>> But if it was resource scarcity I believe the program would terminate
>>>>> with an exception.
>>>>> And if it was garbage collection activity, I believe the JVM process
>>>>> would not be at 0% CPU usage.
>>>>>
>>>>> *Note: *I realize I didn't provide the user-code code that generates
>>>>> the execution plan for Flink which led to the contexts in which the 
>>>>> threads
>>>>> are waiting, but I hope it may not be necessary.
>>>>> My problem now is that I am unsure on how to proceed to further debug
>>>>> this issue:
>>>>> - The assigned memory is fully used, but there are no exceptions about
>>>>> lack of memory.
>>>>> - The CPU usage is at 0% and all threads are all in a waiting state,
>>>>> but I don't understand what signal they're waiting for exactly.
>>>>>
>>>>> Hoping anyone might be able to give me a hint.
>>>>>
>>>>> Thank you very much for your time.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Miguel E. Coimbra
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Reply via email to