Hi Amit Bhatia

> What is the correct way to start three jobmanager replicas with zk? Is
there any link which explains this deployment scenario and configuration?
Please find more information in the last mail. Unfortunately, we do not
have some documentation to guide the users how to achieve that.

> How we'll identify that out of three replicas, which Job Manager replica
is the leader?
Just like what I have said, using a K8s service for the jobmanager rpc
address is not a good practice.
TaskManager/Client could not know which replica is the leader. Instead, we
should bind the rpc address
to pod ip. After then, TaskManager/Client could find the leader address(pod
ip) via ZooKeeper.

Could you please update your yaml files and deploy again? I think you will
have different results then.

Best,
Yang

Yang Wang <danrtsey...@gmail.com> 于2021年1月21日周四 上午11:59写道:

> Hi Chirag Dewan,
>
> Yes, we could have multiple replicas with ZK HA in K8 as well. Multiple
> JobManagers will contend for
> a leader and then write its rpc address to the ZooKeeper nodes. You could
> find more information how the
> HA service works here[1]. It is about the KubernetesHAService, but the
> ZooKeeperHAService has the same
> mechanism.
>
> In such a case, I strongly suggest not using the service as the JobManager
> rpc address. Otherwise, we
> will have the issue you have mentioned. There are 3 replicas behind the
> same service endpoint and only
> one of them is the leader. TaskManager/Client do not know how to contact
> the leader.
>
> Instead, I suggest not creating the internal service and bind the pod ip
> to the JobManager rpc address.
> After then, TaskManager/Client will retrieve the leader address(pod ip +
> port) and contact via such an address.
>
> Please find more information and the example here[1].
>
> [1].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
> [2].
> https://issues.apache.org/jira/browse/FLINK-20982?focusedCommentId=17265715&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17265715
>
> Best,
> Yang
>
>
> Amit Bhatia <bhatia.amit1...@gmail.com> 于2021年1月20日周三 下午12:27写道:
>
>>   Hi Yang,
>>
>> I tried the deployment of flink with three replicas of Jobmanger to test
>> a faster job recovery scenario.  Below is my deployment :
>>
>>  $ kubectl get po -namit | grep zk
>> eric-data-coordinator-zk-0                                        1/1
>> Running            0          6d21h
>> eric-data-coordinator-zk-1                                        1/1
>> Running            0          6d21h
>> eric-data-coordinator-zk-2                                        1/1
>> Running            0          6d21h
>> flink-jobmanager-ha-zk-1-5d58dc469-8bjpb                          1/1
>> Running            0          19h
>> flink-jobmanager-ha-zk-1-5d58dc469-klg5p                          1/1
>> Running            0          19h
>> flink-jobmanager-ha-zk-1-5d58dc469-kvwzk                          1/1
>> Running            0          19h
>>
>>
>>  $ kubectl get svc -namit | grep zk
>> flink-jobmanager-ha-rest-zk1                NodePort       10.100.118.186
>>   <none>        8081:32115/TCP                                 21h
>> flink-jobmanager-ha-zk1                     ClusterIP      10.111.135.174
>>   <none>        6123/TCP,6124/TCP,8081/TCP                     21h
>> eric-data-coordinator-zk                    ClusterIP      10.105.139.167
>>   <none>        2181/TCP,8080/TCP,21007/TCP                    7d20h
>> eric-data-coordinator-zk-ensemble-service   ClusterIP      None
>>   <none>        2888/TCP,3888/TCP                              7d20h
>>
>> Flink Configmap:
>> ====================
>> apiVersion: v1
>> kind: ConfigMap
>> metadata:
>>   name: flink-config-ha-zk-1
>>   namespace: amit
>>   labels:
>>     app: flink
>> data:
>>   flink-conf.yaml: |+
>>     jobmanager.rpc.address: flink-jobmanager-ha-zk1
>>     taskmanager.numberOfTaskSlots: 2
>>     blob.server.port: 6124
>>     jobmanager.rpc.port: 6123
>>     taskmanager.rpc.port: 6122
>>     queryable-state.proxy.ports: 6125
>>     jobmanager.memory.process.size: 1600m
>>     taskmanager.memory.process.size: 1728m
>>     parallelism.default: 2
>>     # High Availability parameters
>>     high-availability: zookeeper
>>     high-availability.cluster-id: /haclusterzk1
>>     high-availability.storageDir: file:///opt/flink/recovery/
>>     high-availability.zookeeper.path.root: /flinkhazk
>>     high-availability.zookeeper.quorum: eric-data-coordinator-zk:2181
>>     high-availability.jobmanager.port: 6123
>> ===============================================================
>>
>> Out of the three replicas of Job manager pods in one of the pod i am
>> getting this error:
>>
>> 2021-01-19 08:18:33,982 INFO
>>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService []
>> - Starting ZooKeeperLeaderElectionService
>> ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
>> 2021-01-19 08:21:39,381 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>> 2021-01-19 08:21:42,521 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>> 2021-01-19 08:21:45,508 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>> 2021-01-19 08:21:46,369 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>> 2021-01-19 08:22:13,658 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>> 2021-01-20 04:10:39,836 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>>
>>
>> And when trying to access the GUI getting below error:
>>
>> [image: image.png]
>>
>> In zookeeper i could see all the three id's are there
>>
>> [zk: localhost:2181(CONNECTED) 5] ls
>> /flinkhazk/haclusterzk1/leaderlatch/dispatcher_lock
>> [_c_1d5fc8b1-063f-4a1c-ad0f-ec46b6f10f36-latch-0000000020,
>> _c_229d0739-8854-4a5a-ace7-377d9edc575f-latch-0000000018,
>> _c_4eac3aaf-3f0f-4297-ac7f-086821548697-latch-0000000019]
>> [zk: localhost:2181(CONNECTED) 6]
>>
>> So i have below queries on this:
>>
>> 1) what is the correct way to start three jobmanager replicas with zk ?
>> Is there any link which explains this deployment scenario and configuration
>> ?
>>
>> 2) How we'll identify that out of three replicas, which Job Manager
>> replica is the leader ?
>>
>> Regards,
>> Amit Bhatia
>>
>>
>> On Wed, Jan 20, 2021 at 9:44 AM Chirag Dewan <chirag.dewa...@yahoo.in>
>> wrote:
>>
>>> Hi,
>>>
>>> Can we have multiple replicas with ZK HA in K8 as well?
>>> In this case, how does Task Managers and clients recover the Job Manager
>>> RPC address? Are they updated in ZK?
>>> Also, since there are 3 replicas behind the same service endpoint and
>>> only one of them is the leader, how should clients reach the leader Job
>>> Manager?
>>>
>>> On Wednesday, 20 January, 2021, 07:41:20 am IST, Yang Wang <
>>> danrtsey...@gmail.com> wrote:
>>>
>>>
>>> If you do not want to run multiple JobManagers simultaneously, then I
>>> think the "Job" for application cluster
>>> with HA enable is enough.
>>> K8s will also launch a new pod/container when the old one terminated
>>> exceptionally.
>>>
>>> Best,
>>> Yang
>>>
>>> Yang Wang <danrtsey...@gmail.com> 于2021年1月20日周三 上午10:08写道:
>>>
>>> Yes. Using a "Deployment" instead of "Job" for the application cluster
>>> also makes sense.
>>> Actually, in the native K8s integration, we always use the deployment
>>> for JobManager.
>>>
>>> But please note that the deployment may relaunch the JobManager pod even
>>> though you cancel
>>> the Flink job.
>>>
>>> Best,
>>> Yang
>>>
>>> Ashish Nigam <ashnigamt...@gmail.com> 于2021年1月20日周三 上午5:29写道:
>>>
>>> Yang,
>>> For Application clusters, does it make sense to deploy JobManager as
>>> "Deployment" rather than as a "Job", as suggested in docs?
>>> I am asking this because I am thinking of deploying a job manager in HA
>>> mode even for application clusters.
>>>
>>> Thanks
>>> Ashish
>>>
>>>
>>> On Tue, Jan 19, 2021 at 6:16 AM Yang Wang <danrtsey...@gmail.com> wrote:
>>>
>>> Usually, you do not need to start multiple JobManager simultaneously.
>>> The JobManager is a deployment.
>>> A new one pod/container will be launched once it terminated
>>> exceptionally.
>>>
>>> If you still want to start multiple JobManagers to get a faster
>>> recovery, you could set the replica greater than 1
>>> for standalone cluster on K8s[1]. For native integration[2], we still
>>> have not supported such configuration[2].
>>>
>>> Please note that the key point to enable HA is not start multiple
>>> JobManagers simultaneously or sequently.
>>> You need to set the ZooKeeperHAService[4] or KubernetesHAService[5] to
>>> ensure the Flink job could recover
>>> from latest successful checkpoint.
>>>
>>> [1].
>>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/standalone/kubernetes.html#session-cluster-resource-definitions
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/native_kubernetes.html
>>> [3]. https://issues.apache.org/jira/browse/FLINK-17707
>>> [4].
>>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/ha/zookeeper_ha.html
>>> [5].
>>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/ha/kubernetes_ha.html
>>>
>>> Best,
>>> Yang
>>>
>>> Amit Bhatia <bhatia.amit1...@gmail.com> 于2021年1月19日周二 下午8:45写道:
>>>
>>> Hi,
>>>
>>> I am deploying Flink 1.12 on K8s. Can anyone confirm if we can deploy
>>> multiple job manager pods in K8s for HA or it should always be only a
>>> single job manager pod ?
>>>
>>> Regards,
>>> Amit Bhatia
>>>
>>>

Reply via email to