Hello Flink team, We use Flink on DCOS and have problems submitting a Flink job from within a container to the Flink cluster. Both the container and the Flink cluster are running inside DCOS, on different nodes.
We have the following setup: Flink was installed on DCOS using the package from the catalog. According to the Flink UI ([DCOS-URL]/service/flink/) the Flink job manager settings are: jobmanager.rpc.address ip-10-0-1-95.eu-central-1.compute.internal jobmanager.rpc.port 14503 jobmanager.web.port 14502 mesos.artifact-server.port 14505 where "ip-10-0-1-95.eu-central-1.compute.internal" is the host name of the DCOS node with IP 10.0.1.95 on which the container with the job manager is running. Furthermore for both the job manager RPC port and the job manager web port a VIP is configured: job manager RPC port: flink.marathon.l4lb.thisdcos.directory:6123 job manager Web port: flink.marathon.l4lb.thisdcos.directory:8081 Now if we try to submit a Flink job to the job manager via the Flink cli performing the following steps: 1) log into the DCOS master node: dcos node ssh --leader --master-proxy 2) start an interactive session inside a Docker container using the Mesosphere Flink image: docker run --rm -it mesosphere/dcos-flink:1.4.2-1.0 /bin/bash 3) submit a Flink job to the Flink job manager: cd /flink-1.4.2 ./bin/flink run -m ip-10-0-1-95.eu-central-1.compute.internal:14503 examples/streaming/WordCount.jar everything works fine. The job appears as an entry within the Flink UI and we get the results we expect. But if we try to submit the same job to the job manager using the VIP of the job manager flink.marathon.l4lb.thisdcos.directory:6123 using: ./bin/flink run -m flink.marathon.l4lb.thisdcos.directory:6123 examples/streaming/WordCount.jar or if we try to submit the job to the job manager using the IP of the DCOS node instead of its host name: ./bin/flink run -m 10.0.1.95:14503 examples/streaming/WordCount.jar the job can not be submitted. Apparently the connection to the job manager can not be established and nothing appears within the Flink UI. You can find the output in attachment. Submitting to the jobmanager using the URL from Mesos DNS is also not working. Why this is not working or why we can only submit jobs using the hostname (ip-10-0-1-95.eu-central-1.compute.internal) of the job manager and not the IP or the VIP? Thank you! Best regards Wei
Cluster configuration: Standalone cluster with JobManager at flink.marathon.l4lb.thisdcos.directory/***.***.***.***:6123 Using address flink.marathon.l4lb.thisdcos.directory:6123 to connect to JobManager. JobManager web interface address http://flink.marathon.l4lb.thisdcos.directory:8081 Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Submitting job with JobID: 22d5a45ee33edf71792c6bde8fc75211. Waiting for job completion. ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300) at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481) ... 21 more Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager. at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:219) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)