Great to hear that you got it working :-) Looking forward to your blog post
to learn more about your experiences :-)

Cheers,
Till

On Fri, Nov 10, 2017 at 10:18 PM, Vergilio, Thalita <
t.vergilio4...@student.leedsbeckett.ac.uk> wrote:

> Hi Til,
>
>
> Thank you very much for that. And thanks for your help. I have finally
> managed to get the multi-cloud setup on Docker Swarm working by tweaking
> the Flink image slightly to set these configuration options to known
> values. I have also used the Weave Net Docker plugin to create my
> cross-cloud network.
>
>
> I am in the process of documenting my experience in a blog article, which
> I will share in this list so others can hopefully benefit from it.
>
>
> Thank you and the rest of the Flink team once again for all your help and
> support.
>
>
> Best wishes,
>
>
> Thalita
> ------------------------------
> *From:* Till Rohrmann <trohrm...@apache.org>
> *Sent:* 10 November 2017 12:15:00
>
> *To:* Vergilio, Thalita
> *Cc:* Piotr Nowojski; user@flink.apache.org; Patrick Lucas
> *Subject:* Re: Docker-Flink Project: TaskManagers can't talk to
> JobManager if they are on different nodes
>
>
> Hi Thalita, yes you can use the mentioned configuration parameters to set
> the ports for the TaskManager and the BlobServer. However, you must make
> sure that there is at most one TM running on a host, otherwise you run into
> port collisions.
>
> For taskmanager.rpc.port and blob.server.port you can define a range.
>
> Cheers,
> Till
> ​
>
> On Fri, Nov 10, 2017 at 11:47 AM, Vergilio, Thalita <
> t.vergilio4...@student.leedsbeckett.ac.uk> wrote:
>
> Hi All,
>
>
> I just wanted to let you know that I have finally managed to get the
> multi-cloud setup working!! I honestly can't believe my eyes. I used a
> Docker plugin called Weave to create the Swarm network, a public external
> IP address for each node and opened a range of ports, and I can now get my
> Google Cloud machine to connect to the Azure machines.
>
>
> There are still some minor issues, i.e. I don't know which exact ports to
> open for TaskManager communication in Flink. They seem to be getting
> assigned randomly at runtime, so I had to open a wide range of ports to
> allow the communication to happen, which is far from ideal.
>
>
> Is there a way of finding out what these ports are and setting them to a
> constant value? Looking at the documentation, the suspects are:
>
>
>
>    -
>
>    taskmanager.rpc.port: The task manager’s IPC port (DEFAULT: 0, which
>    lets the OS choose a free port).
>    -
>
>    taskmanager.data.port: The task manager’s port used for data exchange
>    operations (DEFAULT: 0, which lets the OS choose a free port).
>    -
>
>    blob.server.port: Port definition for the blob server (serving user
>    JARs) on the TaskManagers. By default the port is set to 0, which means
>    that the operating system is picking an ephemeral port. Flink also accepts
>    a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of
>    both. It is recommended to set a range of ports to avoid collisions when
>    multiple JobManagers are running on the same machine.
>
> Many thanks,
>
>
> Thalita
>
> ------------------------------
> *From:* Vergilio, Thalita
> *Sent:* 09 November 2017 22:04:24
> *To:* Till Rohrmann
>
> *Cc:* Piotr Nowojski; user@flink.apache.org; Patrick Lucas
> *Subject:* Re: Docker-Flink Project: TaskManagers can't talk to
> JobManager if they are on different nodes
>
>
> Hi Till,
>
>
> I have made some progress with the name resolution for machines that are
> not in the same subnet. The problem I am facing now is Flink-specific, so I
> wonder if you could help me.
>
>
> It is all running fine in a multi-cloud setup with the jobmanager in Azure
> and the taskmanager in the Google cloud. However, when I scale the
> taskmanager up and it start running on Azure nodes as well, I get an Akka
> error which I presume means the taskmanagers can't talk to each other when
> parallelising the task.
>
>
> Do you know what the IP address and port below are? Are they assigned by
> Flink?
>
>
> Thank you very much.
>
>
> Thalita
>
>
> java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> 
> Flat Map -> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> 
> ParMultiDo(Anonymous) -> ToKeyedWorkItem (2/3) 
> (b9f31626fb7d83d39e24e570e034f03e) - TaskManager 
> (3a9c37463c88510a44097df0c99b5f90 @ 172.18.0.3 (dataPort=38963)) not 
> responding after a timeout of 10000 ms
>       at 
> org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
>       at 
> org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
>       at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
>       at akka.dispatch.Recover.internal(Future.scala:268)
>       at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
>       at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
>       at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
>       at scala.util.Try$.apply(Try.scala:161)
>       at scala.util.Failure.recover(Try.scala:185)
>       at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
>       at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
>       at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka.tcp://flink@172.18.0.3:37959/user/taskmanager#364916492]] after 
> [10000 ms]
>       at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>       at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>       at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>       at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>       at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>       at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>       at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>       at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>       ... 1 more
>
>
> ------------------------------
> *From:* Till Rohrmann <trohrm...@apache.org>
> *Sent:* 06 November 2017 13:48:59
> *To:* Vergilio, Thalita
> *Cc:* Piotr Nowojski; user@flink.apache.org; Patrick Lucas
> *Subject:* Re: Docker-Flink Project: TaskManagers can't talk to
> JobManager if they are on different nodes
>
> I'm not entirely sure how docker swarm works but from the Flink
> perspective there mustn't be two TaskManagers running on the same host
> (meaning an entity where you share the same address) if you set the
> TaskManager data port to a fixed value (otherwise only one of them can be
> started due to port conflicts). If you can ensure that this is the case,
> then it should be save to specify a port for the data transmission.
>
> Cheers,
> Till
>
> On Mon, Nov 6, 2017 at 2:37 PM, Vergilio, Thalita <
> t.vergilio4...@student.leedsbeckett.ac.uk> wrote:
>
> Hi Till,
>
>
> Thanks a lot for your answer.
>
>
> Is the taskmanager.data.port unique per TaskManager? The documentation
> says it is assigned at runtime by the OS. My thinking here is that you
> would need to know what that is at service creation time, which would go
> against the whole idea of how services are scaled in Docker Swarm.
>
>
> When you create a Swarm service using 'docker stack deploy' or 'docker
> service create', the configuration that is used at that point is the same
> that will be used by all instances of the service. If you then scale
> TaskManager to 8 or 10 containers, each of them gets the same service
> configuration(the one used to create the service).
>
>
> I have in fact tried to map specific ports in the TaskManager service
> configuration, but then I got "port already in use" when I tried to scale
> up the service.
>
>
> I wonder if there is a way around it.
>
>
> Maybe the people who developed the create-docker-swarm-service.sh script
> in the docker-flink project would be able to shed some light?
>
>
> ------------------------------
> *From:* Till Rohrmann <trohrm...@apache.org>
> *Sent:* 06 November 2017 12:40:33
> *To:* Piotr Nowojski
> *Cc:* Vergilio, Thalita; user@flink.apache.org; Patrick Lucas
>
> *Subject:* Re: Docker-Flink Project: TaskManagers can't talk to
> JobManager if they are on different nodes
>
> Hi Thalita,
>
> in order to make Flink work, I think you have to expose the JobManager RPC
> port, the Blob server port and make sure that the TaskManager can talk to
> each other by exposing the `taskmanager.data.port`. The query server port
> is only necessary if you want to use queryable state.
>
> I've pulled in Patrick who has more experience with running Flink on top
> of Docker. He'll definitely be able to provide more detailed
> recommendations.
>
> Cheers,
> Till
>
> On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
> Till, is there somewhere a list of ports that need to exposed that’s more
> up to date compared to docker-flunk README?
>
> Piotrek
>
> On 3 Nov 2017, at 10:23, Vergilio, Thalita <t.vergilio4822@student.leedsb
> eckett.ac.uk> wrote:
>
> Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP
> of the JobManager and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I
> manged to get the TaskManagers from different nodes and even different
> subnets to talk to the JobManager.
>
> This is how I created the services:
>
> docker network create -d overlay overlay
>
> docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS=
> {{PUBLIC_IP}}  -p 8081:8081 -p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p
> 6124:6124 -p 6125:6125 --network overlay --constraint 'node.hostname ==
> ubuntu-swarm-manager' flink jobmanager
>
> docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS=
> {{PUBLIC_IP}}  -p 6121:6121 -p 6122:6122  --network overlay --constraint
> 'node.hostname != ubuntu-swarm-manager' flink taskmanager
>
> However, I am still encountering errors further down the line. When I
> submit a job using the Web UI, it fails because the JobManager can't talk
> to the TaskManager on port 35033. I presume this is the
> taskmanager.data.port, which needs to be set to a range and this range
> exposed when I create the service?
>
> Are there any other ports that I need to open at service creation time?
>
> Connecting the channel failed: Connecting to remote task manager + 
> '/{{IP_ADDRESS_OF_MANAGER}}:35033' has failed. This might indicate that the 
> remote task manager has been lost.
>       at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>       at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>       at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>       at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
>       at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>       at java.lang.Thread.run(Thread.java:748)
>
>
>
> ------------------------------
> *From:* Piotr Nowojski <pi...@data-artisans.com>
> *Sent:* 02 November 2017 14:26:32
> *To:* Vergilio, Thalita
> *Cc:* user@flink.apache.org
> *Subject:* Re: Docker-Flink Project: TaskManagers can't talk to
> JobManager if they are on different nodes
>
> Did you try to expose required ports that are listed in the README when
> starting the containers?
>
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
>
> Ports:
> • The Web Client is on port 48081
> • JobManager RPC port 6123 (default, not exposed to host)
> • TaskManagers RPC port 6122 (default, not exposed to host)
> • TaskManagers Data port 6121 (default, not exposed to host)
>
> Piotrek
>
> On 2 Nov 2017, at 14:44, javalass <t.vergilio4822@student.leedsb
> eckett.ac.uk> wrote:
>
> I am using the Docker-Flink project in:
> https://github.com/apache/flink/tree/master/flink-contrib/docker-flink
>
> I am creating the services with the following commands:
> docker network create -d overlay overlay
> docker service create --name jobmanager --env
> JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
> --constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
> docker service create --name taskmanager --env
> JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
> 'node.hostname != ubuntu-swarm-manager' flink taskmanager
>
> I wonder if there's any configuration I'm missing. This is the error I get:
> - Trying to register at JobManager akka.tcp://flink@jobmanager:6123/
> user/jobmanager (attempt 4, timeout: 4000 milliseconds)
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>
> To view the terms under which this email is distributed, please go to:-
> http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html
>
>
>
> To view the terms under which this email is distributed, please go to:-
> http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html
>
>
> To view the terms under which this email is distributed, please go to:-
> http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html
>
>
> To view the terms under which this email is distributed, please go to:-
> http://disclaimer.leedsbeckett.ac.uk/disclaimer/disclaimer.html
>
>

Reply via email to