@Yang: I think this would be valuable to document. I think it's a natural 
question to ask whether you can have standby JMs with Kubernetes. What do you 
think? If you agree, we could create a JIRA ticket and work on the "official" 
docs for this.

On Thu, Jan 21, 2021, at 5:05 AM, Yang Wang wrote:
> Hi Amit Bhatia
> 
> > What is the correct way to start three jobmanager replicas with zk? Is 
> > there any link which explains this deployment scenario and configuration?
> Please find more information in the last mail. Unfortunately, we do not have 
> some documentation to guide the users how to achieve that.
> 
> > How we'll identify that out of three replicas, which Job Manager replica is 
> > the leader?
> Just like what I have said, using a K8s service for the jobmanager rpc 
> address is not a good practice.
> TaskManager/Client could not know which replica is the leader. Instead, we 
> should bind the rpc address
> to pod ip. After then, TaskManager/Client could find the leader address(pod 
> ip) via ZooKeeper.
> 
> Could you please update your yaml files and deploy again? I think you will 
> have different results then.
> 
> Best,
> Yang
> 
> Yang Wang <danrtsey...@gmail.com> 于2021年1月21日周四 上午11:59写道:
>> Hi Chirag Dewan,
>> 
>> Yes, we could have multiple replicas with ZK HA in K8 as well. Multiple 
>> JobManagers will contend for
>> a leader and then write its rpc address to the ZooKeeper nodes. You could 
>> find more information how the
>> HA service works here[1]. It is about the KubernetesHAService, but the 
>> ZooKeeperHAService has the same
>> mechanism.
>> 
>> In such a case, I strongly suggest not using the service as the JobManager 
>> rpc address. Otherwise, we
>> will have the issue you have mentioned. There are 3 replicas behind the same 
>> service endpoint and only
>> one of them is the leader. TaskManager/Client do not know how to contact the 
>> leader.
>> 
>> Instead, I suggest not creating the internal service and bind the pod ip to 
>> the JobManager rpc address.
>> After then, TaskManager/Client will retrieve the leader address(pod ip + 
>> port) and contact via such an address.
>> 
>> Please find more information and the example here[1].
>> 
>> [1]. 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>> [2]. 
>> https://issues.apache.org/jira/browse/FLINK-20982?focusedCommentId=17265715&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17265715
>> 
>> Best,
>> Yang
>> 
>> 
>> Amit Bhatia <bhatia.amit1...@gmail.com> 于2021年1月20日周三 下午12:27写道:
>>>   Hi Yang,
>>> 
>>> I tried the deployment of flink with three replicas of Jobmanger to test a 
>>> faster job recovery scenario.  Below is my deployment :
>>> 
>>>  $ kubectl get po -namit | grep zk
>>> eric-data-coordinator-zk-0                                        1/1     
>>> Running            0          6d21h
>>> eric-data-coordinator-zk-1                                        1/1     
>>> Running            0          6d21h
>>> eric-data-coordinator-zk-2                                        1/1     
>>> Running            0          6d21h
>>> flink-jobmanager-ha-zk-1-5d58dc469-8bjpb                          1/1     
>>> Running            0          19h
>>> flink-jobmanager-ha-zk-1-5d58dc469-klg5p                          1/1     
>>> Running            0          19h
>>> flink-jobmanager-ha-zk-1-5d58dc469-kvwzk                          1/1     
>>> Running            0          19h
>>> 
>>> 
>>>  $ kubectl get svc -namit | grep zk
>>> flink-jobmanager-ha-rest-zk1                NodePort       10.100.118.186   
>>> <none>        8081:32115/TCP                                 21h
>>> flink-jobmanager-ha-zk1                     ClusterIP      10.111.135.174   
>>> <none>        6123/TCP,6124/TCP,8081/TCP                     21h
>>> eric-data-coordinator-zk                    ClusterIP      10.105.139.167   
>>> <none>        2181/TCP,8080/TCP,21007/TCP                    7d20h
>>> eric-data-coordinator-zk-ensemble-service   ClusterIP      None             
>>> <none>        2888/TCP,3888/TCP                              7d20h
>>> 
>>> Flink Configmap:
>>> ====================
>>> apiVersion: v1
>>> kind: ConfigMap
>>> metadata:
>>>   name: flink-config-ha-zk-1
>>>   namespace: amit
>>>   labels:
>>>     app: flink
>>> data:
>>>   flink-conf.yaml: |+
>>>     jobmanager.rpc.address: flink-jobmanager-ha-zk1
>>>     taskmanager.numberOfTaskSlots: 2
>>>     blob.server.port: 6124
>>>     jobmanager.rpc.port: 6123
>>>     taskmanager.rpc.port: 6122
>>>     queryable-state.proxy.ports: 6125
>>>     jobmanager.memory.process.size: 1600m
>>>     taskmanager.memory.process.size: 1728m
>>>     parallelism.default: 2
>>>     # High Availability parameters
>>>     high-availability: zookeeper
>>>     high-availability.cluster-id: /haclusterzk1
>>>     high-availability.storageDir: file:///opt/flink/recovery/
>>>     high-availability.zookeeper.path.root: /flinkhazk
>>>     high-availability.zookeeper.quorum: eric-data-coordinator-zk:2181
>>>     high-availability.jobmanager.port: 6123
>>> ===============================================================
>>> 
>>> Out of the three replicas of Job manager pods in one of the pod i am 
>>> getting this error:
>>> 
>>> 2021-01-19 08:18:33,982 INFO  
>>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - 
>>> Starting ZooKeeperLeaderElectionService 
>>> ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
>>> 2021-01-19 08:21:39,381 WARN  
>>> org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - 
>>> Error while retrieving the leader gateway. Retrying to connect to 
>>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>>> 2021-01-19 08:21:42,521 WARN  
>>> org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - 
>>> Error while retrieving the leader gateway. Retrying to connect to 
>>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>>> 2021-01-19 08:21:45,508 WARN  
>>> org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - 
>>> Error while retrieving the leader gateway. Retrying to connect to 
>>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>>> 2021-01-19 08:21:46,369 WARN  
>>> org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - 
>>> Error while retrieving the leader gateway. Retrying to connect to 
>>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>>> 2021-01-19 08:22:13,658 WARN  
>>> org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - 
>>> Error while retrieving the leader gateway. Retrying to connect to 
>>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>>> 2021-01-20 04:10:39,836 WARN  
>>> org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever [] - 
>>> Error while retrieving the leader gateway. Retrying to connect to 
>>> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>>> 
>>> 
>>> And when trying to access the GUI getting below error:
>>> 
>>> image.png
>>> 
>>> In zookeeper i could see all the three id's are there
>>> 
>>> [zk: localhost:2181(CONNECTED) 5] ls 
>>> /flinkhazk/haclusterzk1/leaderlatch/dispatcher_lock
>>> [_c_1d5fc8b1-063f-4a1c-ad0f-ec46b6f10f36-latch-0000000020, 
>>> _c_229d0739-8854-4a5a-ace7-377d9edc575f-latch-0000000018, 
>>> _c_4eac3aaf-3f0f-4297-ac7f-086821548697-latch-0000000019]
>>> [zk: localhost:2181(CONNECTED) 6]
>>> 
>>> So i have below queries on this:
>>> 
>>> 1) what is the correct way to start three jobmanager replicas with zk ? Is 
>>> there any link which explains this deployment scenario and configuration ?
>>> 
>>> 2) How we'll identify that out of three replicas, which Job Manager replica 
>>> is the leader ?
>>> 
>>> Regards,
>>> Amit Bhatia
>>> 
>>> 
>>> 
>>> On Wed, Jan 20, 2021 at 9:44 AM Chirag Dewan <chirag.dewa...@yahoo.in> 
>>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Can we have multiple replicas with ZK HA in K8 as well?
>>>> In this case, how does Task Managers and clients recover the Job Manager 
>>>> RPC address? Are they updated in ZK?
>>>> Also, since there are 3 replicas behind the same service endpoint and only 
>>>> one of them is the leader, how should clients reach the leader Job Manager?
>>>> 
>>>> On Wednesday, 20 January, 2021, 07:41:20 am IST, Yang Wang 
>>>> <danrtsey...@gmail.com> wrote:
>>>> 
>>>> 
>>>> If you do not want to run multiple JobManagers simultaneously, then I 
>>>> think the "Job" for application cluster
>>>> with HA enable is enough.
>>>> K8s will also launch a new pod/container when the old one terminated 
>>>> exceptionally.
>>>> 
>>>> Best,
>>>> Yang
>>>> 
>>>> Yang Wang <danrtsey...@gmail.com> 于2021年1月20日周三 上午10:08写道:
>>>>> Yes. Using a "Deployment" instead of "Job" for the application cluster 
>>>>> also makes sense.
>>>>> Actually, in the native K8s integration, we always use the deployment for 
>>>>> JobManager.
>>>>> 
>>>>> But please note that the deployment may relaunch the JobManager pod even 
>>>>> though you cancel
>>>>> the Flink job.
>>>>> 
>>>>> Best,
>>>>> Yang
>>>>> 
>>>>> Ashish Nigam <ashnigamt...@gmail.com> 于2021年1月20日周三 上午5:29写道:
>>>>>> Yang,
>>>>>> For Application clusters, does it make sense to deploy JobManager as 
>>>>>> "Deployment" rather than as a "Job", as suggested in docs?
>>>>>> I am asking this because I am thinking of deploying a job manager in HA 
>>>>>> mode even for application clusters.
>>>>>> 
>>>>>> Thanks
>>>>>> Ashish
>>>>>> 
>>>>>> 
>>>>>> On Tue, Jan 19, 2021 at 6:16 AM Yang Wang <danrtsey...@gmail.com> wrote:
>>>>>>> Usually, you do not need to start multiple JobManager simultaneously. 
>>>>>>> The JobManager is a deployment.
>>>>>>> A new one pod/container will be launched once it terminated 
>>>>>>> exceptionally. 
>>>>>>> 
>>>>>>> If you still want to start multiple JobManagers to get a faster 
>>>>>>> recovery, you could set the replica greater than 1
>>>>>>> for standalone cluster on K8s[1]. For native integration[2], we still 
>>>>>>> have not supported such configuration[2].
>>>>>>> 
>>>>>>> Please note that the key point to enable HA is not start multiple 
>>>>>>> JobManagers simultaneously or sequently.
>>>>>>> You need to set the ZooKeeperHAService[4] or KubernetesHAService[5] to 
>>>>>>> ensure the Flink job could recover
>>>>>>> from latest successful checkpoint.
>>>>>>> 
>>>>>>> [1]. 
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/standalone/kubernetes.html#session-cluster-resource-definitions
>>>>>>> [2]. 
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/native_kubernetes.html
>>>>>>> [3]. https://issues.apache.org/jira/browse/FLINK-17707
>>>>>>> [4]. 
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/ha/zookeeper_ha.html
>>>>>>> [5]. 
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/ha/kubernetes_ha.html
>>>>>>> 
>>>>>>> Best,
>>>>>>> Yang
>>>>>>> 
>>>>>>> Amit Bhatia <bhatia.amit1...@gmail.com> 于2021年1月19日周二 下午8:45写道:
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> I am deploying Flink 1.12 on K8s. Can anyone confirm if we can deploy 
>>>>>>>> multiple job manager pods in K8s for HA or it should always be only a 
>>>>>>>> single job manager pod ?
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Amit Bhatia

Reply via email to