[ https://issues.apache.org/jira/browse/FLINK-6369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Patrick Lucas updated FLINK-6369: --------------------------------- Fix Version/s: (was: 1.3.1) 1.4.0 > Better support for overlay networks > ----------------------------------- > > Key: FLINK-6369 > URL: https://issues.apache.org/jira/browse/FLINK-6369 > Project: Flink > Issue Type: Improvement > Components: Docker, Network > Affects Versions: 1.2.0 > Reporter: Patrick Lucas > Fix For: 1.4.0 > > > Running Flink in an environment that utilizes an overlay network > (containerized environments like Kubernetes or Docker Compose, or cloud > platforms like AWS VPC) poses various challenges related to networking. > The core problem is that in these environments, applications are frequently > addressed by a name different from that with which the application sees > itself. > For instance, it is plausible that the Flink UI (served by the Jobmanager) is > accessed via an ELB, which poses a problem in HA mode when the non-leader UI > returns an HTTP redirect to the leader—but the user may not be able to > connect directly to the leader. > Or, if a user is using [Docker > Compose|https://github.com/apache/flink/blob/aa21f853ab0380ec1f68ae1d0b7c8d9268da4533/flink-contrib/docker-flink/docker-compose.yml], > they cannot submit a job via the CLI since there is a mismatch between the > name used to address the Jobmanager and what the Jobmanager perceives as its > hostname. (see \[1] below for more detail) > ---- > h3. Problems and proposed solutions > There are four instances of this issue that I've run into so far: > h4. Jobmanagers must be addressed by the same name they are configured with > due to limitations of Akka > Akka enforces that messages it receives are addressed with the hostname it is > configured with. Newer versions of Akka (>= 2.4) than what Flink uses > (2.3-custom) have support for accepting messages with the "wrong" hostname, > but it limited to a single "external" hostname. > In many environments, it is likely that not all parties that want to connect > to the Jobmanager have the same way of addressing it (e.g. the ELB example > above). Other similarly-used protocols like HTTP generally don't have this > restriction: if you connect on a socket and send a well-formed message, the > system assumes that it is the desired recipient. > One solution is to not use Akka at all when communicating with the cluster > from the outside, perhaps using an HTTP API instead. This would be somewhat > involved, and probabyl best left as a longer-term goal. > A more immediate solution would be to override this behavior within Flakka, > the custom fork of Akka currently in use by Flink. I'm not sure how much > effort this would take. > h4. The Jobmanager needs to be able to address the Taskmanagers for e.g. > metrics collection > Having the Taskmanagers register themselves by IP is probably the best > solution here. It's a reasonable assumption that IPs can always be used for > communication between the nodes of a single cluster. Asking that each > Taskmanager container have a resolvable hostname is unreasonable. > h4. Jobmanagers in HA mode send HTTP redirects to URLs that aren't externally > resolvable/routable > If multiple Jobmanagers are used in HA mode, HTTP requests to non-leaders > (such as if you put a Kubernetes Service in front of all Jobmanagers in a > cluster) get redirected to the (supposed) hostname of the leader, but this is > potentially unresolvable/unroutable externally. > Enabling non-leader Jobmanagers to proxy API calls to the leader would solve > this. The non-leaders could even serve static asset requests (e.g. for css or > js files) directly. > h4. Queryable state requests involve direct communication with Taskmanagers > Currently, queryable state requests involve communication between the client > and the Jobmanager (for key partitioning lookups) and between the client and > all Taskmanagers. > If the client is inside the network (as would be common in production > use-cases where high-volume lookups are required) this is a non-issue, but > problems crop up if the client is outside the network. > For the communication with the Jobmanager, a similar solution as above can be > used: if all Jobmanagers can service all key partitioning lookup requests > (e.g. by proxying) then a simple Service can be used. > The story is a bit different for the Taskmanagers. The partitioning lookup to > the Jobmanager would return the name of the particular Taskmanager that owned > the desired data, but that name (likely an IP, as proposed in the second > section above) is not necessarily resolvable/routable from the client. > In the context of Kubernetes, where individual containers are generally not > addressible, a very ugly solution would involve creating a Service for each > Taskmanager, then cleverly configuring things such that the same name could > be used to address a specific Taskmanager both inside and outside the > network. \[2] > A much nicer solution would be, like in the previous section, to enable > Taskmanagers to proxy any queryable state lookup to the appropriate member of > the cluster. Once again, the principle is for every node to be able to > fulfill every request. > This is of course less efficient than addressing the "correct" Taskmanager > directly, but it greatly simplifies the situation for users that want to make > queryable state requests from outside the network. > ---- > h3. Subtasks > Once there has been some discussion about the proposed solutions above, this > issue can be used as umbrella ticket for any relevant subtasks. > ---- > h3. Footnotes > \[1] In this example, the Jobmanager may be configured with > {{jobmanager.rpc.address: jobmanager}} and indeed, within the Docker network > containing the nodes of the cluster, the name {{jobmanager}} is resolveable. > But outside the Docker network, the port is mapped to {{localhost}}. When the > user runs {{$ flink run -m localhost:6123 ...}}, the CLI connects to the > Jobmanager using Akka, but Akka enforces that received messages are addressed > with the same name it is configured with. The result is that the CLI hangs > until a timeout is reached, and warning messages appear in the Jobmanager's > log like: > {noformat}dropping message [class akka.actor.ActorSelectionMessage] for > non-local recipient [Actor[akka.tcp://flink@localhost:6123/]] arriving at > [akka.tcp://flink@localhost:6123] inbound addresses are > [akka.tcp://flink@jobmanager:6123] > 2017-04-24 09:47:52,560 WARN akka.remote.ReliableDeliverySupervisor{noformat} > \[2] Another option is to use a Kubernetes StatefulSet, which gives you > per-pod addressability. The downside is that currently all scaling operations > on a StatefulSet (including initial creation) always create or delete pods in > sequence instead of concurrently, making cluster launch time linear with the > number of nodes in the cluster. -- This message was sent by Atlassian JIRA (v6.3.15#6346)