Hi everyone, I've been experimenting with Kubernetes HA and the Kubernetes Operator and ran into the following issue which is happening regularly on TaskManagers with Flink 1.16:
Error while retrieving the leader gateway. Retrying to connect to akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1. org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. (The whole stacktrace is quite long, I put it in a Github Gist here <https://gist.github.com/antonipp/41b4cb732522a91799e0f57ea96805a3>. Note that I put placeholder values for the Kubernetes Service name and the Namespace name) The job configuration has the following values which should be relevant: high-availability: kubernetes high-availability.jobmanager.port: 6123 jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE jobmanager.rpc.port: 6123 Looking a bit more into the logs, I can see that the Akka Actor System is started with an external address pointing to the Kubernetes Service defined by jobmanager.rpc.address: Trying to start actor system, external address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123. Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE :6123 (I believe the external address for the Akka Actor System is set to jobmanager.rpc.address from this place <https://github.com/apache/flink/blob/0141f13ca801d5db45435d101a9c3ef83889bbc0/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L367> in the code but I might be wrong) I can also see these logs for the Dispatcher RPC endpoint: Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 . Successfully wrote leader information LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18', leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1} for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map. I confirmed that the HA ConfigMap contains an address which also uses the Kubernetes Service defined by jobmanager.rpc.address: $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r '.data["org.apache.flink.k8s.leader.dispatcher"]' ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE :6123/user/rpc/dispatcher_1 When looking at the code of the Operator and Flink itself, I can see that jobmanager.rpc.address is set automatically by the InternalServiceDecorator <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66> and it points to the Kubernetes Service. However, the comment <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L34-L38> above clearly says that "only the non-HA scenario relies on this Service for internal communication, since in the HA mode, the TaskManager(s) directly connects to the JobManager via IP address." According to the docs <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#basic-setup>, jobmanager.rpc.address "is ignored on setups with high-availability where the leader election mechanism is used to discover this automatically." This is not what I'm observing as it seems that despite enabling HA, the TaskManagers don't use IP addresses but still use this Kubernetes Service for JM communication. Moreover, I've used the Lyft Kubernetes Operator before and it has these interesting lines in the code: https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L212-L216 It explicitly sets jobmanager.rpc.address to the host IPs. Am I misconfiguring or misunderstanding something? Is there any way to fix these errors? Thanks! Anton