Hi everybody,

I'm Ned, a young and passionte developer of apache technologies. I have
been playing with apache flink lastly.

This is what I wanted to do submit a flink topology to a remote flink
cluster. The following are the steps that I did.

- Install flink as a cluster indicated on the link
https://ci.apache.org/projects/flink/flink-docs-master/setup/cluster_setup.html
on three remotes VMs.
- Run the sample WordCountRemoteByClient
<https://github.com/apache/flink/blob/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java>
by
changing
conf.put(Config.NIMBUS_HOST, "localhost"); to
conf.put(Config.NIMBUS_HOST, "publicIpOfJobmanagerInMyRemoteCluster");

Unfortunately for me when I run that program, I have a the following
exception.

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.RuntimeException: Could not connect to Flink
JobManager with address publicIpOfJobmanagerInMyRemoteCluster:6123
at
org.apache.flink.storm.api.FlinkClient.getTopologyJobId(FlinkClient.java:305)
at
org.apache.flink.storm.api.FlinkClient.submitTopologyWithOpts(FlinkClient.java:177)
at
org.apache.flink.storm.api.FlinkClient.submitTopology(FlinkClient.java:167)
at
stormWorldCount.WordCountRemoteByClient.main(WordCountRemoteByClient.java:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
... 6 more
Caused by: java.io.IOException: Actor at akka.tcp://flink@
publicIpOfJobmanagerInMyRemoteCluster:6123/user/jobmanager not reachable.
Please make sure that the actor is running and its port is reachable.
at org.apache.flink.runtime.akka.AkkaUtils$.getActorRef(AkkaUtils.scala:384)
at
org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerActorRef(JobManager.scala:2380)
at
org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerActorRef(JobManager.scala:2400)
at
org.apache.flink.runtime.jobmanager.JobManager.getJobManagerActorRef(JobManager.scala)
at
org.apache.flink.storm.api.FlinkClient.getJobManager(FlinkClient.java:333)
at
org.apache.flink.storm.api.FlinkClient.getTopologyJobId(FlinkClient.java:279)
... 14 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.flink.runtime.akka.AkkaUtils$.getActorRef(AkkaUtils.scala:380)
... 19 more

I try ping my jobmanager with
curl publicIpOfJobmanagerInMyRemoteCluster:6123 I had the following as
responces.

curl: (52) Empty reply from server

Which is an indication that the job manager is reachable.

So I was wondering if I doing it the right way. Please any help will be
welcoming.

Thanks,
Ned

Reply via email to