Miguel, I and my colleague ran into same problem yesterday.
We were expecting Flink to get 4 inputs from Kafka and write the inputs to
Cassandra, but the operators got stuck after the 1st input is written into
Cassandra.
This is how DAG looks like:
Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
After we disable the auto chaining (
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#task-chaining-and-resource-groups),
all 4 inputs are read from Kafka and written into Cassandra.
We are still figuring out why the chaining causes the blocking.


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275

2018-04-18 6:57 GMT+08:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:

> Chesnay, following your suggestions I got access to the web interface and
> also took a closer look at the debugging logs.
> I have noticed one problem regarding the web interface port - it keeps
> changing port now and then during my Java program's execution.
>
> Not sure if that is due to my program launching several job executions
> sequentially, but the fact is that it happened.
> Since I am accessing the web interface via tunneling, it becomes rather
> cumbersome to keep adapting it.
>
> Another particular problem I'm noticing is that this exception frequently
> pops up (debugging with log4j):
>
> 00:17:54,368 DEBUG org.apache.flink.runtime.jobma
> ster.slotpool.SlotPool          - Releasing slot with slot request id
> 9055ef473251505dac04c99727106dc9.
> org.apache.flink.util.FlinkException: Slot is being returned to the
> SlotPool.
>         at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$Provide
> rAndOwner.returnAllocatedSlot(SlotPool.java:1521)
>         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
> t.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
>         at java.util.concurrent.CompletableFuture.uniHandle(Completable
> Future.java:822)
>         at java.util.concurrent.CompletableFuture.uniHandleStage(Comple
> tableFuture.java:834)
>         at java.util.concurrent.CompletableFuture.handle(CompletableFut
> ure.java:2155)
>         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
> t.releaseSlot(SingleLogicalSlot.java:130)
>         at org.apache.flink.runtime.executiongraph.Execution.releaseAss
> ignedResource(Execution.java:1239)
>         at org.apache.flink.runtime.executiongraph.Execution.markFinish
> ed(Execution.java:946)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
> eState(ExecutionGraph.java:1588)
>         at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecu
> tionState(JobMaster.java:593)
>         at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
> thodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
> cation(AkkaRpcActor.java:210)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage
> (AkkaRpcActor.java:154)
>         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleM
> essage(FencedAkkaRpcActor.java:66)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onRece
> ive$1(AkkaRpcActor.java:132)
>         at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell
> .scala:544)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
> java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
> ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
> l.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
> orkerThread.java:107)
>
> Don't know if the internals of Flink are explicitly using an exception for
> control flow, but there are several occurrences of this as time goes by.
>
> Regarding my program itself, I've achieved some progress.
> In my program I need to do a sequence of series of Flink jobs, and need
> extra care to make sure no DataSet instance from job *i* is being used in
> an operator in job *i + 1*.
> I believe this was generating the waiting scenarios I describe in an
> earlier email.
> The bottom line is to be extra careful about when job executions are
> actually triggered and to make sure that a DataSet which will need to be
> used in different Flink jobs is available for example as a file in
> secondary storage (possibly masked as a memory-mapping) and is exclusively
> read from that source.
> This means ensuring the job that originally produces a DataSet (for reuse
> on a later job) assigns to it a DataSink for secondary storage.
>
> I'm going to keep digging taking this in account - if will report back if
> I manage to fix everything or find a new problem.
>
> Thanks again,
>
>
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>
> On 16 April 2018 at 10:26, Chesnay Schepler <ches...@apache.org> wrote:
>
>> ah yes, currently when you use that method the UI is started on a random
>> port. I'm currently fixing that in this PR
>> <https://github.com/apache/flink/pull/5814> that will be merged today.
>> For now you will enable logging and search for something along the lines of
>> "http://<host>:<port> was granted leadership"
>>
>> Sorry for the inconvenience.
>>
>> On 16.04.2018 15:04, Miguel Coimbra wrote:
>>
>> Thanks for the suggestions Chesnay, I will try them out.
>>
>> However, I have already tried your suggestion with the dependency
>> flink-runtime-web and nothing happened.
>> If I understood you correctly, adding that dependency in the pom.xml
>> would make it so the web front-end is running when I call the following
>> line?
>>
>> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
>> alEnvironmentWithWebUI(conf);
>>
>> I added flink-runtime-web  in my pom.xml, recompiled and launched the
>> program but I simply got "Unable to connect" in my browser (Firefox) on
>> localhost:8081.
>> Performing wget on localhost:8081 resulted in this:
>>
>> $ wget localhost:8081
>> --2018-04-16 12:47:26--  http://localhost:8081/
>> Resolving localhost (localhost)... ::1, 127.0.0.1
>> Connecting to localhost (localhost)|::1|:8081... failed: Connection
>> refused.
>> Connecting to localhost (localhost)|127.0.0.1|:8081... failed: Connection
>> refused.
>>
>> It seems something was bound to localhost:8081 but the connection is not
>> working for some reason.
>> I probably am skipping some important detail.
>> These are some of my dependencies:
>>
>> <dependency>
>>     <groupId>org.apache.flink</groupId>
>>     <artifactId>flink-java</artifactId>
>>     <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>>     <groupId>org.apache.flink</groupId>
>>     <artifactId>flink-core</artifactId>
>>     <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>>     <groupId>org.apache.flink</groupId>
>>     <artifactId>flink-clients_${scala.binary.version}</artifactId>
>>     <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>>     <groupId>org.apache.flink</groupId>
>>     <artifactId>flink-gelly_${scala.binary.version}</artifactId>
>>     <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>>     <groupId>org.apache.flink</groupId>
>>     <artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
>>     <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>>     <groupId>org.apache.flink</groupId>
>>     <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>     <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>>      <groupId>org.apache.flink</groupId>
>>      <artifactId>flink-streaming-scala_${scala.binary.version}</
>> artifactId>
>>      <version>${flink.version}</version>
>> </dependency>
>> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-
>> runtime-web -->
>>
>>
>>
>>
>> *<dependency>      <groupId>org.apache.flink</groupId>
>>  <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
>>  <version>${flink.version}</version> </dependency>*
>>
>> Have you managed to get the web front-end in local mode?
>>
>>
>> Best regards,
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>
>> On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org> wrote:
>>
>>> The thing with createLocalEnvironmentWithWebUI is that it requires
>>> flink-runtime-web to be on the classpath, which is rarely the class
>>> when running things in the IDE.
>>> It should work fine in the IDE if you add it as a dependency to your
>>> project. This should've been logged as a warning.
>>>
>>> Chaining is unrelated to this issue as join operators are never chained
>>> to one another.
>>> Lambda functions are also not the issue, if they were the job would fail
>>> much earlier.
>>>
>>> It is reasonable that T3 is blocked if T1 is blocked. T1 gets no input
>>> hence produces no output, which now also blocks T3.
>>>
>>> There are multiple possible explanations i can come up with:
>>> * the preceding operators are blocked on something or *really *slow
>>> * the preceding operators are actually finished, but aren't shutting
>>> down due to an implementation error
>>> * a deadlock in Flink's join logic
>>> * a deadlock in Flink's network stack
>>>
>>> For the first 2 we will have to consult the UI or logs. You said you
>>> were dumping the input DataSets into files, but were they actually complete?
>>>
>>> A deadlock in the network stack should appear as all existing operator
>>> threads being blocked.
>>> We can probably rule out a problem with the join logic by removing the
>>> second join and trying again.
>>>
>>>
>>>
>>> On 16.04.2018 03:10, Miguel Coimbra wrote:
>>>
>>> Hello,
>>>
>>> It would seem that the function which is supposed to launch local mode
>>> with the web front-end doesn't launch the front-end at all...
>>> This function seems not to be doing what it is supposed to do, if I'm
>>> not mistaken:
>>>
>>> LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLoc
>>> alEnvironmentWithWebUI(conf);
>>>
>>> Regarding the preceding operators, the thread dumps I got were pointing
>>> to a specific set of operations over DataSet instances that were passed
>>> into my function.
>>> Below I show the code segment and put the lines where threads are
>>> waiting in *bold*:
>>>
>>> public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final
>>> Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>> vertices) {
>>>     return vertices
>>>             .joinWithHuge(originalGraph.getEdges())
>>>             .where(0).equalTo(0)
>>> *            .with((source, edge) -> edge)* *// Thread 1 is blocked
>>> here*
>>>             .returns(originalGraph.getEdges().getType())
>>>             .join(vertices)
>>>             .where(1).equalTo(0)
>>> *            .with((e, v) -> e) // Thread 3 is blocked here*
>>>             .returns(originalGraph.getEdges().getType())
>>>             .distinct(0, 1);
>>> }
>>>
>>> Note: the edges inside the graph originalGraph edge DataSet are much
>>> greater in number than the elements of the vertices DataSet, so I
>>> believe that function is being used correctly.
>>>
>>> I will try testing with remote (cluster) mode to have access to the web
>>> front-end, but I have some questions for now:
>>>
>>> - The fact that they are blocked in different ​JoinOperator instances
>>> that are chained, is this a result of Flink's default pipeline mechanism?
>>> - Could there be a problem stemming from the fact they are both waiting
>>> on lambdas?
>>> - I have tried dumping both DataSet variables originalGraph and vertices
>>> into files (the ones being used in this code), and they produced correct
>>> values (non-empty files), so I don't have a clue what the threads inside
>>> Flink's runtime are waiting on.
>>>
>>> ​Thanks for the help so far Chesnay.​
>>>
>>>
>>> Miguel E. Coimbra
>>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>>>
>>> ---------- Forwarded message ----------
>>>
>>>> From: Chesnay Schepler <ches...@apache.org>
>>>> To: user@flink.apache.org
>>>> Cc:
>>>> Bcc:
>>>> Date: Sun, 15 Apr 2018 18:54:33 +0200
>>>> Subject: Re: Unsure how to further debug - operator threads stuck on
>>>> java.lang.Thread.State: WAITING
>>>> Hello,
>>>>
>>>> Thread #1-3 are waiting for input, Thread #4 is waiting for the job to
>>>> finish.
>>>>
>>>> To further debug this I would look into what the preceding operators
>>>> are doing, whether they are blocked on something or are emitting records
>>>> (which you can check in the UI/metrics).
>>>>
>>>> On 15.04.2018 18:40, Miguel Coimbra wrote:
>>>>
>>>> ​Hello,
>>>>
>>>> I am running into a situation where the Flink threads responsible for
>>>> my operator execution are all stuck on WAITING mode.
>>>> Before anything else, this is my machine's spec:
>>>>
>>>> Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz
>>>> GenuineIntel GNU/Linux
>>>> 256 GB RAM
>>>>
>>>> I am running in local mode on a machine with a considerable amount of
>>>> memory, so perhaps that may be triggering some execution edge-case?
>>>>
>>>> Moving on, this is my Java:
>>>>
>>>> openjdk version "1.8.0_151"
>>>> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
>>>> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
>>>>
>>>> Getting back to the problem: I am currently using Flink 1.5-SNAPSHOT
>>>> with LocalEnvironment on this large-memory machine, with parallelism
>>>> set to one:
>>>>
>>>> Configuration conf = new Configuration();
>>>> LocalEnvironment lenv = (LocalEnvironment)
>>>> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>>>> ExecutionEnvironment env = lenv;
>>>> env.getConfig().enableSysoutLogging().enableClosureCleaner()
>>>> .enableObjectReuse();
>>>> env.setParallelism(1);
>>>>
>>>> This initializes the execution environment for a series of sequential
>>>> jobs (any data dependency between jobs is flushed to disk on job *i *and
>>>> read back from disk into a DataSet in job *i + 1*).
>>>> To reiterate, I am not launching a Flink cluster, I am just executing
>>>> in local mode from a code base compiled with Maven.
>>>>
>>>> I have tested this program via mvn exec:exec with different values of
>>>> memory (from -Xmx20000m to -Xmx120000m, from 20GB to 120GB) and the
>>>> result is always the same: the process' memory fills up completely and then
>>>> the process' CPU usage drops to 0%.
>>>> This is strange because if it was lack of memory, I would expect an
>>>> OutOfMemoryError.
>>>>
>>>> I have debugged with IntelliJ IDEA and obtained thread dumps from
>>>> different executions, and realized quite a few operator threads are stuck
>>>> on java.lang.Thread.State: WAITING.
>>>>
>>>> There are four major threads that I find to be in this waiting state.
>>>> The thread dumps I obtained show me where the wait calls originated:
>>>>
>>>>
>>>>
>>>> *Number 1: *"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) ->
>>>> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158"
>>>> prio=5 tid=0xd93 nid=NA waiting
>>>>   java.lang.Thread.State: WAITING
>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>       at java.lang.Object.wait(Object.java:502)
>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>> Reader.next(MutableRecordReader.java:47)
>>>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>> ReaderIterator.java:59)
>>>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>>>> beIterator.next(MutableHashTable.java:1929)
>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>> cessProbeIter(MutableHashTable.java:505)
>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>> tRecord(MutableHashTable.java:666)
>>>>       at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa
>>>> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte
>>>> rator.java:122)
>>>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>>>> .java:221)
>>>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>> ava:503)
>>>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>> k.java:368)
>>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>> *Number 2:*
>>>>
>>>> "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153"
>>>> prio=5 tid=0xd8e nid=NA waiting
>>>>   java.lang.Thread.State: WAITING
>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>       at java.lang.Object.wait(Object.java:502)
>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>> Reader.next(MutableRecordReader.java:47)
>>>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>> ReaderIterator.java:59)
>>>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>>>> beIterator.next(MutableHashTable.java:1929)
>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>> cessProbeIter(MutableHashTable.java:505)
>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>> tRecord(MutableHashTable.java:666)
>>>>       at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHa
>>>> shJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIte
>>>> rator.java:122)
>>>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>>>> .java:221)
>>>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>> ava:503)
>>>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>> k.java:368)
>>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> *Number 3:*
>>>>
>>>> "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5
>>>> tid=0xd75 nid=NA waiting
>>>>   java.lang.Thread.State: WAITING
>>>>       at java.lang.Object.wait(Object.java:-1)
>>>>       at java.lang.Object.wait(Object.java:502)
>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>>>>       at org.apache.flink.runtime.io.network.partition.consumer.Singl
>>>> eInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>>>>       at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>> dReader.getNextRecord(AbstractRecordReader.java:86)
>>>>       at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>> Reader.next(MutableRecordReader.java:47)
>>>>       at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>> ReaderIterator.java:59)
>>>>       at org.apache.flink.runtime.operators.util.metrics.CountingMuta
>>>> bleObjectIterator.next(CountingMutableObjectIterator.java:36)
>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable$Pro
>>>> beIterator.next(MutableHashTable.java:1929)
>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.pro
>>>> cessProbeIter(MutableHashTable.java:505)
>>>>       at org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>> tRecord(MutableHashTable.java:666)
>>>>       at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHas
>>>> hJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinItera
>>>> tor.java:123)
>>>>       at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver
>>>> .java:221)
>>>>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>> ava:503)
>>>>       at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>> k.java:368)
>>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> *Number 4:*
>>>>
>>>> "Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
>>>>   java.lang.Thread.State: WAITING
>>>>       at sun.misc.Unsafe.park(Unsafe.java:-1)
>>>>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java
>>>> :175)
>>>>       at java.util.concurrent.CompletableFuture$Signaller.block(Compl
>>>> etableFuture.java:1693)
>>>>       at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.
>>>> java:3323)
>>>>       at java.util.concurrent.CompletableFuture.waitingGet(Completabl
>>>> eFuture.java:1729)
>>>>       at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>>> .java:1895)
>>>>       at org.apache.flink.runtime.minicluster.MiniCluster.executeJobB
>>>> locking(MiniCluster.java:519)
>>>>       at org.apache.flink.client.LocalExecutor.executePlan(LocalExecu
>>>> tor.java:231)
>>>>       - locked <0x23eb> (a java.lang.Object)
>>>>       at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvi
>>>> ronment.java:91)
>>>>       at org.apache.flink.api.java.ExecutionEnvironment.execute(Execu
>>>> tionEnvironment.java:815)
>>>>       at org.apache.flink.api.java.DataSet.count(DataSet.java:398)
>>>>       at my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(M
>>>> isc.java:103)
>>>>       at my.package.algorithm.Sample.computeApproximateDeltaFast(Samp
>>>> le.java:492)
>>>>       at my.package.algorithm.Sample.run(Sample.java:291).
>>>>       at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> While I realize these dumps on their own may not be helpful, they at
>>>> least (as far as I know) indicate that the threads are all waiting on
>>>> something.
>>>> But if it was resource scarcity I believe the program would terminate
>>>> with an exception.
>>>> And if it was garbage collection activity, I believe the JVM process
>>>> would not be at 0% CPU usage.
>>>>
>>>> *Note: *I realize I didn't provide the user-code code that generates
>>>> the execution plan for Flink which led to the contexts in which the threads
>>>> are waiting, but I hope it may not be necessary.
>>>> My problem now is that I am unsure on how to proceed to further debug
>>>> this issue:
>>>> - The assigned memory is fully used, but there are no exceptions about
>>>> lack of memory.
>>>> - The CPU usage is at 0% and all threads are all in a waiting state,
>>>> but I don't understand what signal they're waiting for exactly.
>>>>
>>>> Hoping anyone might be able to give me a hint.
>>>>
>>>> Thank you very much for your time.
>>>>
>>>> Best regards,
>>>>
>>>> Miguel E. Coimbra
>>>>
>>>>
>>>
>>
>>
>

Reply via email to