Hi everyone,

I've been experimenting with Kubernetes HA and the Kubernetes Operator and
ran into the following issue which is happening regularly on TaskManagers
with Flink 1.16:

Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not
complete the operation. Number of retries has been exhausted.

(The whole stacktrace is quite long, I put it in a Github Gist here
<https://gist.github.com/antonipp/41b4cb732522a91799e0f57ea96805a3>. Note
that I put placeholder values for the Kubernetes Service name and the
Namespace name)

The job configuration has the following values which should be relevant:
high-availability: kubernetes
high-availability.jobmanager.port: 6123
jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
jobmanager.rpc.port: 6123

Looking a bit more into the logs, I can see that the Akka Actor System is
started with an external address pointing to the Kubernetes Service defined
by jobmanager.rpc.address:
Trying to start actor system, external
address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123.
Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
:6123

(I believe the external address for the Akka Actor System is set to
jobmanager.rpc.address from this place
<https://github.com/apache/flink/blob/0141f13ca801d5db45435d101a9c3ef83889bbc0/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L367>
in the code but I might be wrong)

I can also see these logs for the Dispatcher RPC endpoint:
Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_1 .
Successfully wrote leader information
LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.

I confirmed that the HA ConfigMap contains an address which also uses the
Kubernetes Service defined by jobmanager.rpc.address:
$ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
'.data["org.apache.flink.k8s.leader.dispatcher"]'
ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
:6123/user/rpc/dispatcher_1

When looking at the code of the Operator and Flink itself, I can see
that jobmanager.rpc.address is set automatically by the
InternalServiceDecorator
<https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66>
and
it points to the Kubernetes Service.
However, the comment
<https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L34-L38>
above clearly says that "only the non-HA scenario relies on this Service
for internal communication, since in the HA mode, the TaskManager(s)
directly connects to the JobManager via IP address." According to the docs
<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#basic-setup>,
jobmanager.rpc.address "is ignored on setups with high-availability where
the leader election mechanism is used to discover this automatically."

This is not what I'm observing as it seems that despite enabling HA, the
TaskManagers don't use IP addresses but still use this Kubernetes Service
for JM communication.

Moreover, I've used the Lyft Kubernetes Operator before and it has these
interesting lines in the code:
https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L212-L216
It explicitly sets jobmanager.rpc.address to the host IPs.

Am I misconfiguring or misunderstanding something? Is there any way to fix
these errors?

Thanks!
Anton

Reply via email to