ah yes, currently when you use that method the UI is started on a random
port. I'm currently fixing that in this PR
<https://github.com/apache/flink/pull/5814> that will be merged today.
For now you will enable logging and search for something along the lines
of "http://<host>:<port> was granted leadership"
Sorry for the inconvenience.
On 16.04.2018 15:04, Miguel Coimbra wrote:
Thanks for the suggestions Chesnay, I will try them out.
However, I have already tried your suggestion with the dependency
flink-runtime-web and nothing happened.
If I understood you correctly, adding that dependency in the pom.xml
would make it so the web front-end is running when I call the
following line?
LocalEnvironment lenv = (LocalEnvironment)
ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
I added flink-runtime-web in my pom.xml, recompiled and launched the
program but I simply got "Unable to connect" in my browser (Firefox)
on localhost:8081.
Performing wget on localhost:8081 resulted in this:
$ wget localhost:8081
--2018-04-16 12:47:26-- http://localhost:8081/
Resolving localhost (localhost)... ::1, 127.0.0.1
Connecting to localhost (localhost)|::1|:8081... failed: Connection
refused.
Connecting to localhost (localhost)|127.0.0.1|:8081... failed:
Connection refused.
It seems something was bound to localhost:8081 but the connection is
not working for some reason.
I probably am skipping some important detail.
These are some of my dependencies:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly-examples_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--
https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
*<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>**
*
Have you managed to get the web front-end in local mode?
Best regards,
Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <mailto:miguel.e.coim...@ist.utl.pt>
On 16 April 2018 at 05:12, Chesnay Schepler <ches...@apache.org
<mailto:ches...@apache.org>> wrote:
The thing with createLocalEnvironmentWithWebUI is that it requires
flink-runtime-web to be on the classpath, which is rarely the
class when running things in the IDE.
It should work fine in the IDE if you add it as a dependency to
your project. This should've been logged as a warning.
Chaining is unrelated to this issue as join operators are never
chained to one another.
Lambda functions are also not the issue, if they were the job
would fail much earlier.
It is reasonable that T3 is blocked if T1 is blocked. T1 gets no
input hence produces no output, which now also blocks T3.
There are multiple possible explanations i can come up with:
* the preceding operators are blocked on something or /really /slow
* the preceding operators are actually finished, but aren't
shutting down due to an implementation error
* a deadlock in Flink's join logic
* a deadlock in Flink's network stack
For the first 2 we will have to consult the UI or logs. You said
you were dumping the input DataSets into files, but were they
actually complete?
A deadlock in the network stack should appear as all existing
operator threads being blocked.
We can probably rule out a problem with the join logic by removing
the second join and trying again.
On 16.04.2018 03:10, Miguel Coimbra wrote:
Hello,
It would seem that the function which is supposed to launch local
mode with the web front-end doesn't launch the front-end at all...
This function seems not to be doing what it is supposed to do, if
I'm not mistaken:
LocalEnvironment lenv = (LocalEnvironment)
ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
Regarding the preceding operators, the thread dumps I got were
pointing to a specific set of operations over DataSet instances
that were passed into my function.
Below I show the code segment and put the lines where threads are
waiting in *bold*:
public static <K, VV, EV> DataSet<Edge<K, EV>> selectEdges(final
Graph<K, VV, EV> originalGraph, final DataSet<Vertex<K, VV>>
vertices) {
return vertices
.joinWithHuge(originalGraph.getEdges())
.where(0).equalTo(0)
* .with((source, edge) -> edge)* *// Thread 1 is
blocked here*
.returns(originalGraph.getEdges().getType())
.join(vertices)
.where(1).equalTo(0)
* .with((e, v) -> e) // Thread 3 is blocked here*
.returns(originalGraph.getEdges().getType())
.distinct(0, 1);
}
Note: the edges inside the graph originalGraph edge DataSet are
much greater in number than the elements of the vertices DataSet,
so I believe that function is being used correctly.
I will try testing with remote (cluster) mode to have access to
the web front-end, but I have some questions for now:
- The fact that they are blocked in different JoinOperator
instances that are chained, is this a result of Flink's default
pipeline mechanism?
- Could there be a problem stemming from the fact they are both
waiting on lambdas?
- I have tried dumping both DataSet variables originalGraph and
vertices into files (the ones being used in this code), and they
produced correct values (non-empty files), so I don't have a clue
what the threads inside Flink's runtime are waiting on.
Thanks for the help so far Chesnay.
Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com
<mailto:miguel.e.coim...@ist.utl.pt>
---------- Forwarded message ----------
From: Chesnay Schepler <ches...@apache.org
<mailto:ches...@apache.org>>
To: user@flink.apache.org <mailto:user@flink.apache.org>
Cc:
Bcc:
Date: Sun, 15 Apr 2018 18:54:33 +0200
Subject: Re: Unsure how to further debug - operator threads
stuck on java.lang.Thread.State: WAITING
Hello,
Thread #1-3 are waiting for input, Thread #4 is waiting for
the job to finish.
To further debug this I would look into what the preceding
operators are doing, whether they are blocked on something or
are emitting records (which you can check in the UI/metrics).
On 15.04.2018 18:40, Miguel Coimbra wrote:
Hello,
I am running into a situation where the Flink threads
responsible for my operator execution are all stuck on
WAITING mode.
Before anything else, this is my machine's spec:
Linux 4.4.88 #1 SMP x86_64 Intel(R) Xeon(R) CPU E7- 4830 @
2.13GHz GenuineIntel GNU/Linux
256 GB RAM
I am running in local mode on a machine with a considerable
amount of memory, so perhaps that may be triggering some
execution edge-case?
Moving on, this is my Java:
openjdk version "1.8.0_151"
OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
Getting back to the problem: I am currently using Flink
1.5-SNAPSHOT with LocalEnvironment on this large-memory
machine, with parallelism set to one:
Configuration conf = new Configuration();
LocalEnvironment lenv = (LocalEnvironment)
ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
ExecutionEnvironment env = lenv;
env.getConfig().enableSysoutLogging().enableClosureCleaner().enableObjectReuse();
env.setParallelism(1);
This initializes the execution environment for a series of
sequential jobs (any data dependency between jobs is flushed
to disk on job /i /and read back from disk into a DataSet in
job /i + 1/).
To reiterate, I am not launching a Flink cluster, I am just
executing in local mode from a code base compiled with Maven.
I have tested this program via mvn exec:exec with different
values of memory (from -Xmx20000m to -Xmx120000m, from 20GB
to 120GB) and the result is always the same: the process'
memory fills up completely and then the process' CPU usage
drops to 0%.
This is strange because if it was lack of memory, I would
expect an OutOfMemoryError.
I have debugged with IntelliJ IDEA and obtained thread dumps
from different executions, and realized quite a few operator
threads are stuck on java.lang.Thread.State: WAITING.
There are four major threads that I find to be in this
waiting state.
The thread dumps I obtained show me where the wait calls
originated:
*Number 1:
*"CHAIN Join (Join at selectEdges(GraphUtils.java:328)) ->
Combine (Distinct at selectEdges(GraphUtils.java:330))
(1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:502)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
at
org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.pro
<http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at
org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
at
org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
*Number 2:*
"Join (Join at summaryGraph(SummaryGraphBuilder.java:92))
(1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:502)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
at
org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.pro
<http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at
org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
at
org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
*Number 3:*
"Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118"
prio=5 tid=0xd75 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:502)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
at org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
at
org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.pro
<http://tors.hash.MutableHashTable.pro>cessProbeIter(MutableHashTable.java:505)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at
org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
at
org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
*Number 4:*
"Update Graph Thread@7013" prio=5 tid=0x5dc nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:519)
at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:231)
- locked <0x23eb> (a java.lang.Object)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
at
org.apache.flink.api.java.DataSet.count(DataSet.java:398)
at
my.package.algorithm.Misc.SummaryGraphBuilder.summaryGraph(Misc.java:103)
at my.package.algorithm.Sample.co
<http://my.package.algorithm.Sample.co>mputeApproximateDeltaFast(Sample.java:492)
at my.package.algorithm.Sample.ru
<http://my.package.algorithm.Sample.ru>n(Sample.java:291).
at java.lang.Thread.run(Thread.java:748)
While I realize these dumps on their own may not be helpful,
they at least (as far as I know) indicate that the threads
are all waiting on something.
But if it was resource scarcity I believe the program would
terminate with an exception.
And if it was garbage collection activity, I believe the JVM
process would not be at 0% CPU usage.
*Note: *I realize I didn't provide the user-code code that
generates the execution plan for Flink which led to the
contexts in which the threads are waiting, but I hope it may
not be necessary.
My problem now is that I am unsure on how to proceed to
further debug this issue:
- The assigned memory is fully used, but there are no
exceptions about lack of memory.
- The CPU usage is at 0% and all threads are all in a
waiting state, but I don't understand what signal they're
waiting for exactly.
Hoping anyone might be able to give me a hint.
Thank you very much for your time.
Best regards,
Miguel E. Coimbra