Makes sense, thank you! On Tue, Jan 31, 2023 at 10:48 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
> Thanks @Anton Ippolitov <anton.ippoli...@datadoghq.com> > At this stage I would highly recommend the native mode if you have the > liberty to try that. > I think that has better production characteristics and will work out of > the box with the autoscaler. (the standalone mode won't) > > Gyula > > On Tue, Jan 31, 2023 at 10:41 AM Anton Ippolitov < > anton.ippoli...@datadoghq.com> wrote: > >> I am using the Standalone Mode indeed, should've mentioned it right away. >> This fix looks exactly like what I need, thank you!! >> >> On Tue, Jan 31, 2023 at 9:16 AM Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> There is also a pending fix for the standalone + k8s HA case : >>> https://github.com/apache/flink-kubernetes-operator/pull/518 >>> >>> You could maybe try and review the fix :) >>> >>> Gyula >>> >>> On Tue, Jan 31, 2023 at 8:36 AM Yang Wang <wangyang0...@apache.org> >>> wrote: >>> >>>> I assume you are using the standalone mode. Right? >>>> >>>> For the native K8s mode, the leader address should be >>>> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1 >>>> *when HA enabled. >>>> >>>> >>>> Best, >>>> Yang >>>> >>>> Anton Ippolitov via user <user@flink.apache.org> 于2023年1月31日周二 00:21写道: >>>> >>>>> This is actually what I'm already doing, I'm only setting >>>>> high-availability: >>>>> kubernetes myself. The other values are either defaults or set by the >>>>> Operator: >>>>> - jobmanager.rpc.port: 6123 is the default value (docs >>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#hosts-and-ports> >>>>> ) >>>>> - high-availability.jobmanager.port: 6123 is set by the Operator here >>>>> <https://github.com/apache/flink-kubernetes-operator/blob/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java#L141-L144> >>>>> >>>>> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by >>>>> the Operator here >>>>> <https://github.com/apache/flink-kubernetes-operator/blob/261fed2076efe385ede148152c946eb7c5f1f48d/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java#L80> >>>>> (the >>>>> actual code which gets executed is here >>>>> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66> >>>>> ) >>>>> >>>>> Looking at what the Lyft Operator is doing here >>>>> <https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L215>, >>>>> I thought >>>>> this would be a common issue but since you've never seen this error >>>>> before, >>>>> not sure what to do 🤔 >>>>> >>>>> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra <gyula.f...@gmail.com> >>>>> wrote: >>>>> >>>>>> We never encountered this problem before but also we don't configure >>>>>> those settings. >>>>>> Can you simply try: >>>>>> >>>>>> high-availability: kubernetes >>>>>> >>>>>> And remove the other configs? I think that can only cause problems >>>>>> and should not achieve anything :) >>>>>> >>>>>> Gyula >>>>>> >>>>>> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user < >>>>>> user@flink.apache.org> wrote: >>>>>> >>>>>>> Hi everyone, >>>>>>> >>>>>>> I've been experimenting with Kubernetes HA and the Kubernetes >>>>>>> Operator and ran into the following issue which is happening regularly >>>>>>> on >>>>>>> TaskManagers with Flink 1.16: >>>>>>> >>>>>>> Error while retrieving the leader gateway. Retrying to connect to >>>>>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1. >>>>>>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not >>>>>>> complete the operation. Number of retries has been exhausted. >>>>>>> >>>>>>> (The whole stacktrace is quite long, I put it in a Github Gist here >>>>>>> <https://gist.github.com/antonipp/41b4cb732522a91799e0f57ea96805a3>. >>>>>>> Note that I put placeholder values for the Kubernetes Service name and >>>>>>> the >>>>>>> Namespace name) >>>>>>> >>>>>>> The job configuration has the following values which should be >>>>>>> relevant: >>>>>>> high-availability: kubernetes >>>>>>> high-availability.jobmanager.port: 6123 >>>>>>> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE >>>>>>> jobmanager.rpc.port: 6123 >>>>>>> >>>>>>> Looking a bit more into the logs, I can see that the Akka Actor >>>>>>> System is started with an external address pointing to the Kubernetes >>>>>>> Service defined by jobmanager.rpc.address: >>>>>>> Trying to start actor system, external >>>>>>> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address >>>>>>> 0.0.0.0:6123. >>>>>>> Actor system started at >>>>>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123 >>>>>>> >>>>>>> (I believe the external address for the Akka Actor System is set to >>>>>>> jobmanager.rpc.address from this place >>>>>>> <https://github.com/apache/flink/blob/0141f13ca801d5db45435d101a9c3ef83889bbc0/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L367> >>>>>>> in the code but I might be wrong) >>>>>>> >>>>>>> I can also see these logs for the Dispatcher RPC endpoint: >>>>>>> Starting RPC endpoint for >>>>>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at >>>>>>> akka://flink/user/rpc/dispatcher_1 . >>>>>>> Successfully wrote leader information >>>>>>> LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18', >>>>>>> leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1} >>>>>>> for leader dispatcher into the config map >>>>>>> JOB-NAME-HERE-cluster-config-map. >>>>>>> >>>>>>> I confirmed that the HA ConfigMap contains an address which also >>>>>>> uses the Kubernetes Service defined by jobmanager.rpc.address: >>>>>>> $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r >>>>>>> '.data["org.apache.flink.k8s.leader.dispatcher"]' >>>>>>> >>>>>>> ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE >>>>>>> :6123/user/rpc/dispatcher_1 >>>>>>> >>>>>>> When looking at the code of the Operator and Flink itself, I can see >>>>>>> that jobmanager.rpc.address is set automatically by the >>>>>>> InternalServiceDecorator >>>>>>> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66> >>>>>>> and >>>>>>> it points to the Kubernetes Service. >>>>>>> However, the comment >>>>>>> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L34-L38> >>>>>>> above clearly says that "only the non-HA scenario relies on this Service >>>>>>> for internal communication, since in the HA mode, the TaskManager(s) >>>>>>> directly connects to the JobManager via IP address." According to the >>>>>>> docs >>>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#basic-setup>, >>>>>>> jobmanager.rpc.address "is ignored on setups with high-availability >>>>>>> where >>>>>>> the leader election mechanism is used to discover this automatically." >>>>>>> >>>>>>> This is not what I'm observing as it seems that despite enabling HA, >>>>>>> the TaskManagers don't use IP addresses but still use this Kubernetes >>>>>>> Service for JM communication. >>>>>>> >>>>>>> Moreover, I've used the Lyft Kubernetes Operator before and it has >>>>>>> these interesting lines in the code: >>>>>>> https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L212-L216 >>>>>>> It explicitly sets jobmanager.rpc.address to the host IPs. >>>>>>> >>>>>>> Am I misconfiguring or misunderstanding something? Is there any way >>>>>>> to fix these errors? >>>>>>> >>>>>>> Thanks! >>>>>>> Anton >>>>>>> >>>>>>