Makes sense, thank you!

On Tue, Jan 31, 2023 at 10:48 AM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Thanks @Anton Ippolitov <anton.ippoli...@datadoghq.com>
> At this stage I would highly recommend the native mode if you have the
> liberty to try that.
> I think that has better production characteristics and will work out of
> the box with the autoscaler. (the standalone mode won't)
>
> Gyula
>
> On Tue, Jan 31, 2023 at 10:41 AM Anton Ippolitov <
> anton.ippoli...@datadoghq.com> wrote:
>
>> I am using the Standalone Mode indeed, should've mentioned it right away.
>> This fix looks exactly like what I need, thank you!!
>>
>> On Tue, Jan 31, 2023 at 9:16 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>>> There is also a pending fix for the standalone + k8s HA case :
>>> https://github.com/apache/flink-kubernetes-operator/pull/518
>>>
>>> You could maybe try and review the fix :)
>>>
>>> Gyula
>>>
>>> On Tue, Jan 31, 2023 at 8:36 AM Yang Wang <wangyang0...@apache.org>
>>> wrote:
>>>
>>>> I assume you are using the standalone mode. Right?
>>>>
>>>> For the native K8s mode, the leader address should be 
>>>> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
>>>> *when HA enabled.
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> Anton Ippolitov via user <user@flink.apache.org> 于2023年1月31日周二 00:21写道:
>>>>
>>>>> This is actually what I'm already doing, I'm only setting 
>>>>> high-availability:
>>>>> kubernetes myself. The other values are either defaults or set by the
>>>>> Operator:
>>>>> - jobmanager.rpc.port: 6123 is the default value (docs
>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#hosts-and-ports>
>>>>> )
>>>>> -  high-availability.jobmanager.port: 6123 is set by the Operator here
>>>>> <https://github.com/apache/flink-kubernetes-operator/blob/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java#L141-L144>
>>>>>
>>>>> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by
>>>>> the Operator here
>>>>> <https://github.com/apache/flink-kubernetes-operator/blob/261fed2076efe385ede148152c946eb7c5f1f48d/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java#L80>
>>>>>  (the
>>>>> actual code which gets executed is here
>>>>> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66>
>>>>> )
>>>>>
>>>>>  Looking at what the Lyft Operator is doing here
>>>>> <https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L215>,
>>>>>  I thought
>>>>> this would be a common issue but since you've never seen this error 
>>>>> before,
>>>>> not sure what to do 🤔
>>>>>
>>>>> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> We never encountered this problem before but also we don't configure
>>>>>> those settings.
>>>>>> Can you simply try:
>>>>>>
>>>>>> high-availability: kubernetes
>>>>>>
>>>>>> And remove the other configs? I think that can only cause problems
>>>>>> and should not achieve anything :)
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
>>>>>> user@flink.apache.org> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I've been experimenting with Kubernetes HA and the Kubernetes
>>>>>>> Operator and ran into the following issue which is happening regularly 
>>>>>>> on
>>>>>>> TaskManagers with Flink 1.16:
>>>>>>>
>>>>>>> Error while retrieving the leader gateway. Retrying to connect to 
>>>>>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>>>>>>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
>>>>>>> complete the operation. Number of retries has been exhausted.
>>>>>>>
>>>>>>> (The whole stacktrace is quite long, I put it in a Github Gist here
>>>>>>> <https://gist.github.com/antonipp/41b4cb732522a91799e0f57ea96805a3>.
>>>>>>> Note that I put placeholder values for the Kubernetes Service name and 
>>>>>>> the
>>>>>>> Namespace name)
>>>>>>>
>>>>>>> The job configuration has the following values which should be
>>>>>>> relevant:
>>>>>>> high-availability: kubernetes
>>>>>>> high-availability.jobmanager.port: 6123
>>>>>>> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
>>>>>>> jobmanager.rpc.port: 6123
>>>>>>>
>>>>>>> Looking a bit more into the logs, I can see that the Akka Actor
>>>>>>> System is started with an external address pointing to the Kubernetes
>>>>>>> Service defined by jobmanager.rpc.address:
>>>>>>> Trying to start actor system, external
>>>>>>> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address
>>>>>>> 0.0.0.0:6123.
>>>>>>> Actor system started at
>>>>>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123
>>>>>>>
>>>>>>> (I believe the external address for the Akka Actor System is set to
>>>>>>> jobmanager.rpc.address from this place
>>>>>>> <https://github.com/apache/flink/blob/0141f13ca801d5db45435d101a9c3ef83889bbc0/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L367>
>>>>>>> in the code but I might be wrong)
>>>>>>>
>>>>>>> I can also see these logs for the Dispatcher RPC endpoint:
>>>>>>> Starting RPC endpoint for
>>>>>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
>>>>>>> akka://flink/user/rpc/dispatcher_1 .
>>>>>>> Successfully wrote leader information
>>>>>>> LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
>>>>>>> leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
>>>>>>> for leader dispatcher into the config map 
>>>>>>> JOB-NAME-HERE-cluster-config-map.
>>>>>>>
>>>>>>> I confirmed that the HA ConfigMap contains an address which also
>>>>>>> uses the Kubernetes Service defined by jobmanager.rpc.address:
>>>>>>> $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
>>>>>>> '.data["org.apache.flink.k8s.leader.dispatcher"]'
>>>>>>>
>>>>>>> ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
>>>>>>> :6123/user/rpc/dispatcher_1
>>>>>>>
>>>>>>> When looking at the code of the Operator and Flink itself, I can see
>>>>>>> that jobmanager.rpc.address is set automatically by the
>>>>>>> InternalServiceDecorator
>>>>>>> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66>
>>>>>>>  and
>>>>>>> it points to the Kubernetes Service.
>>>>>>> However, the comment
>>>>>>> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L34-L38>
>>>>>>> above clearly says that "only the non-HA scenario relies on this Service
>>>>>>> for internal communication, since in the HA mode, the TaskManager(s)
>>>>>>> directly connects to the JobManager via IP address." According to the
>>>>>>> docs
>>>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#basic-setup>,
>>>>>>> jobmanager.rpc.address "is ignored on setups with high-availability 
>>>>>>> where
>>>>>>> the leader election mechanism is used to discover this automatically."
>>>>>>>
>>>>>>> This is not what I'm observing as it seems that despite enabling HA,
>>>>>>> the TaskManagers don't use IP addresses but still use this Kubernetes
>>>>>>> Service for JM communication.
>>>>>>>
>>>>>>> Moreover, I've used the Lyft Kubernetes Operator before and it has
>>>>>>> these interesting lines in the code:
>>>>>>> https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L212-L216
>>>>>>> It explicitly sets jobmanager.rpc.address to the host IPs.
>>>>>>>
>>>>>>> Am I misconfiguring or misunderstanding something? Is there any way
>>>>>>> to fix these errors?
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Anton
>>>>>>>
>>>>>>

Reply via email to