Hello Flink team,

We use Flink on DCOS and have problems submitting a Flink job from within a
container to the Flink cluster. Both the container and the Flink cluster
are running inside DCOS, on different nodes.



We have the following setup: Flink was installed on DCOS using the package
from the catalog. According to the Flink UI ([DCOS-URL]/service/flink/) the
Flink job manager settings are:



    jobmanager.rpc.address
ip-10-0-1-95.eu-central-1.compute.internal

    jobmanager.rpc.port          14503

    jobmanager.web.port                       14502

    mesos.artifact-server.port      14505



where "ip-10-0-1-95.eu-central-1.compute.internal" is the host name of the
DCOS node with IP 10.0.1.95 on which the container with the job manager is
running.



Furthermore for both the job manager RPC port and the job manager web port
a VIP is configured:



job manager RPC port: flink.marathon.l4lb.thisdcos.directory:6123

job manager Web port: flink.marathon.l4lb.thisdcos.directory:8081





Now if we try to submit a Flink job to the job manager via the Flink cli
performing the following steps:

1) log into the DCOS master node:

    dcos node ssh --leader --master-proxy

2) start an interactive session inside a Docker container using the
Mesosphere Flink image:

    docker run --rm -it mesosphere/dcos-flink:1.4.2-1.0 /bin/bash

3) submit a Flink job to the Flink job manager:

    cd /flink-1.4.2

    ./bin/flink run -m ip-10-0-1-95.eu-central-1.compute.internal:14503
examples/streaming/WordCount.jar



everything works fine. The job appears as an entry within the Flink UI and
we get the results we expect.



But if we try to submit the same job to the job manager using the VIP of
the job manager flink.marathon.l4lb.thisdcos.directory:6123 using:



    ./bin/flink run -m flink.marathon.l4lb.thisdcos.directory:6123
examples/streaming/WordCount.jar



or if we try to submit the job to the job manager using the IP of the DCOS
node instead of its host name:



    ./bin/flink run -m 10.0.1.95:14503 examples/streaming/WordCount.jar



the job can not be submitted. Apparently the connection to the job manager
can not be established and nothing appears within the Flink UI. You can
find the output in attachment.

Submitting to the jobmanager using the URL from Mesos DNS is also not
working.



Why this is not working or why we can only submit jobs using the hostname
(ip-10-0-1-95.eu-central-1.compute.internal) of the job manager and not the
IP or the VIP?


Thank you!


Best regards

Wei
Cluster configuration: Standalone cluster with JobManager at 
flink.marathon.l4lb.thisdcos.directory/***.***.***.***:6123
Using address flink.marathon.l4lb.thisdcos.directory:6123 to connect to 
JobManager.
JobManager web interface address 
http://flink.marathon.l4lb.thisdcos.directory:8081
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: 22d5a45ee33edf71792c6bde8fc75211. Waiting for job 
completion.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
        at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
        at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
        at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
        at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
        at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
retrieve the JobExecutionResult from the JobManager.
        at 
org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
        at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
        at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481)
        ... 21 more
Caused by: 
org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost 
connection to the JobManager.
        at 
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:219)
        at 
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104)
        at 
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71)
        at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Reply via email to