[ 
https://issues.apache.org/jira/browse/FLINK-6369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Lucas updated FLINK-6369:
---------------------------------
    Fix Version/s:     (was: 1.3.1)
                   1.4.0

> Better support for overlay networks
> -----------------------------------
>
>                 Key: FLINK-6369
>                 URL: https://issues.apache.org/jira/browse/FLINK-6369
>             Project: Flink
>          Issue Type: Improvement
>          Components: Docker, Network
>    Affects Versions: 1.2.0
>            Reporter: Patrick Lucas
>             Fix For: 1.4.0
>
>
> Running Flink in an environment that utilizes an overlay network 
> (containerized environments like Kubernetes or Docker Compose, or cloud 
> platforms like AWS VPC) poses various challenges related to networking.
> The core problem is that in these environments, applications are frequently 
> addressed by a name different from that with which the application sees 
> itself.
> For instance, it is plausible that the Flink UI (served by the Jobmanager) is 
> accessed via an ELB, which poses a problem in HA mode when the non-leader UI 
> returns an HTTP redirect to the leader—but the user may not be able to 
> connect directly to the leader.
> Or, if a user is using [Docker 
> Compose|https://github.com/apache/flink/blob/aa21f853ab0380ec1f68ae1d0b7c8d9268da4533/flink-contrib/docker-flink/docker-compose.yml],
>  they cannot submit a job via the CLI since there is a mismatch between the 
> name used to address the Jobmanager and what the Jobmanager perceives as its 
> hostname. (see \[1] below for more detail)
> ----
> h3. Problems and proposed solutions
> There are four instances of this issue that I've run into so far:
> h4. Jobmanagers must be addressed by the same name they are configured with 
> due to limitations of Akka
> Akka enforces that messages it receives are addressed with the hostname it is 
> configured with. Newer versions of Akka (>= 2.4) than what Flink uses 
> (2.3-custom) have support for accepting messages with the "wrong" hostname, 
> but it limited to a single "external" hostname.
> In many environments, it is likely that not all parties that want to connect 
> to the Jobmanager have the same way of addressing it (e.g. the ELB example 
> above). Other similarly-used protocols like HTTP generally don't have this 
> restriction: if you connect on a socket and send a well-formed message, the 
> system assumes that it is the desired recipient.
> One solution is to not use Akka at all when communicating with the cluster 
> from the outside, perhaps using an HTTP API instead. This would be somewhat 
> involved, and probabyl best left as a longer-term goal.
> A more immediate solution would be to override this behavior within Flakka, 
> the custom fork of Akka currently in use by Flink. I'm not sure how much 
> effort this would take.
> h4. The Jobmanager needs to be able to address the Taskmanagers for e.g. 
> metrics collection
> Having the Taskmanagers register themselves by IP is probably the best 
> solution here. It's a reasonable assumption that IPs can always be used for 
> communication between the nodes of a single cluster. Asking that each 
> Taskmanager container have a resolvable hostname is unreasonable.
> h4. Jobmanagers in HA mode send HTTP redirects to URLs that aren't externally 
> resolvable/routable
> If multiple Jobmanagers are used in HA mode, HTTP requests to non-leaders 
> (such as if you put a Kubernetes Service in front of all Jobmanagers in a 
> cluster) get redirected to the (supposed) hostname of the leader, but this is 
> potentially unresolvable/unroutable externally.
> Enabling non-leader Jobmanagers to proxy API calls to the leader would solve 
> this. The non-leaders could even serve static asset requests (e.g. for css or 
> js files) directly.
> h4. Queryable state requests involve direct communication with Taskmanagers
> Currently, queryable state requests involve communication between the client 
> and the Jobmanager (for key partitioning lookups) and between the client and 
> all Taskmanagers.
> If the client is inside the network (as would be common in production 
> use-cases where high-volume lookups are required) this is a non-issue, but 
> problems crop up if the client is outside the network.
> For the communication with the Jobmanager, a similar solution as above can be 
> used: if all Jobmanagers can service all key partitioning lookup requests 
> (e.g. by proxying) then a simple Service can be used.
> The story is a bit different for the Taskmanagers. The partitioning lookup to 
> the Jobmanager would return the name of the particular Taskmanager that owned 
> the desired data, but that name (likely an IP, as proposed in the second 
> section above) is not necessarily resolvable/routable from the client.
> In the context of Kubernetes, where individual containers are generally not 
> addressible, a very ugly solution would involve creating a Service for each 
> Taskmanager, then cleverly configuring things such that the same name could 
> be used to address a specific Taskmanager both inside and outside the 
> network. \[2]
> A much nicer solution would be, like in the previous section, to enable 
> Taskmanagers to proxy any queryable state lookup to the appropriate member of 
> the cluster. Once again, the principle is for every node to be able to 
> fulfill every request.
> This is of course less efficient than addressing the "correct" Taskmanager 
> directly, but it greatly simplifies the situation for users that want to make 
> queryable state requests from outside the network.
> ----
> h3. Subtasks
> Once there has been some discussion about the proposed solutions above, this 
> issue can be used as umbrella ticket for any relevant subtasks.
> ----
> h3. Footnotes
> \[1] In this example, the Jobmanager may be configured with 
> {{jobmanager.rpc.address: jobmanager}} and indeed, within the Docker network 
> containing the nodes of the cluster, the name {{jobmanager}} is resolveable. 
> But outside the Docker network, the port is mapped to {{localhost}}. When the 
> user runs {{$ flink run -m localhost:6123 ...}}, the CLI connects to the 
> Jobmanager using Akka, but Akka enforces that received messages are addressed 
> with the same name it is configured with. The result is that the CLI hangs 
> until a timeout is reached, and warning messages appear in the Jobmanager's 
> log like:
> {noformat}dropping message [class akka.actor.ActorSelectionMessage] for 
> non-local recipient [Actor[akka.tcp://flink@localhost:6123/]] arriving at 
> [akka.tcp://flink@localhost:6123] inbound addresses are 
> [akka.tcp://flink@jobmanager:6123]
> 2017-04-24 09:47:52,560 WARN  akka.remote.ReliableDeliverySupervisor{noformat}
> \[2] Another option is to use a Kubernetes StatefulSet, which gives you 
> per-pod addressability. The downside is that currently all scaling operations 
> on a StatefulSet (including initial creation) always create or delete pods in 
> sequence instead of concurrently, making cluster launch time linear with the 
> number of nodes in the cluster.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to