Hi Chirag Dewan,

Yes, we could have multiple replicas with ZK HA in K8 as well. Multiple
JobManagers will contend for
a leader and then write its rpc address to the ZooKeeper nodes. You could
find more information how the
HA service works here[1]. It is about the KubernetesHAService, but the
ZooKeeperHAService has the same
mechanism.

In such a case, I strongly suggest not using the service as the JobManager
rpc address. Otherwise, we
will have the issue you have mentioned. There are 3 replicas behind the
same service endpoint and only
one of them is the leader. TaskManager/Client do not know how to contact
the leader.

Instead, I suggest not creating the internal service and bind the pod ip to
the JobManager rpc address.
After then, TaskManager/Client will retrieve the leader address(pod ip +
port) and contact via such an address.

Please find more information and the example here[1].

[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
[2].
https://issues.apache.org/jira/browse/FLINK-20982?focusedCommentId=17265715&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17265715

Best,
Yang


Amit Bhatia <bhatia.amit1...@gmail.com> 于2021年1月20日周三 下午12:27写道:

>   Hi Yang,
>
> I tried the deployment of flink with three replicas of Jobmanger to test a
> faster job recovery scenario.  Below is my deployment :
>
>  $ kubectl get po -namit | grep zk
> eric-data-coordinator-zk-0                                        1/1
> Running            0          6d21h
> eric-data-coordinator-zk-1                                        1/1
> Running            0          6d21h
> eric-data-coordinator-zk-2                                        1/1
> Running            0          6d21h
> flink-jobmanager-ha-zk-1-5d58dc469-8bjpb                          1/1
> Running            0          19h
> flink-jobmanager-ha-zk-1-5d58dc469-klg5p                          1/1
> Running            0          19h
> flink-jobmanager-ha-zk-1-5d58dc469-kvwzk                          1/1
> Running            0          19h
>
>
>  $ kubectl get svc -namit | grep zk
> flink-jobmanager-ha-rest-zk1                NodePort       10.100.118.186
>   <none>        8081:32115/TCP                                 21h
> flink-jobmanager-ha-zk1                     ClusterIP      10.111.135.174
>   <none>        6123/TCP,6124/TCP,8081/TCP                     21h
> eric-data-coordinator-zk                    ClusterIP      10.105.139.167
>   <none>        2181/TCP,8080/TCP,21007/TCP                    7d20h
> eric-data-coordinator-zk-ensemble-service   ClusterIP      None
>   <none>        2888/TCP,3888/TCP                              7d20h
>
> Flink Configmap:
> ====================
> apiVersion: v1
> kind: ConfigMap
> metadata:
>   name: flink-config-ha-zk-1
>   namespace: amit
>   labels:
>     app: flink
> data:
>   flink-conf.yaml: |+
>     jobmanager.rpc.address: flink-jobmanager-ha-zk1
>     taskmanager.numberOfTaskSlots: 2
>     blob.server.port: 6124
>     jobmanager.rpc.port: 6123
>     taskmanager.rpc.port: 6122
>     queryable-state.proxy.ports: 6125
>     jobmanager.memory.process.size: 1600m
>     taskmanager.memory.process.size: 1728m
>     parallelism.default: 2
>     # High Availability parameters
>     high-availability: zookeeper
>     high-availability.cluster-id: /haclusterzk1
>     high-availability.storageDir: file:///opt/flink/recovery/
>     high-availability.zookeeper.path.root: /flinkhazk
>     high-availability.zookeeper.quorum: eric-data-coordinator-zk:2181
>     high-availability.jobmanager.port: 6123
> ===============================================================
>
> Out of the three replicas of Job manager pods in one of the pod i am
> getting this error:
>
> 2021-01-19 08:18:33,982 INFO
>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService []
> - Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
> 2021-01-19 08:21:39,381 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
> 2021-01-19 08:21:42,521 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
> 2021-01-19 08:21:45,508 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
> 2021-01-19 08:21:46,369 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
> 2021-01-19 08:22:13,658 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
> 2021-01-20 04:10:39,836 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager-ha-zk1:6123/user/rpc/dispatcher_1.
>
>
> And when trying to access the GUI getting below error:
>
> [image: image.png]
>
> In zookeeper i could see all the three id's are there
>
> [zk: localhost:2181(CONNECTED) 5] ls
> /flinkhazk/haclusterzk1/leaderlatch/dispatcher_lock
> [_c_1d5fc8b1-063f-4a1c-ad0f-ec46b6f10f36-latch-0000000020,
> _c_229d0739-8854-4a5a-ace7-377d9edc575f-latch-0000000018,
> _c_4eac3aaf-3f0f-4297-ac7f-086821548697-latch-0000000019]
> [zk: localhost:2181(CONNECTED) 6]
>
> So i have below queries on this:
>
> 1) what is the correct way to start three jobmanager replicas with zk ? Is
> there any link which explains this deployment scenario and configuration ?
>
> 2) How we'll identify that out of three replicas, which Job Manager
> replica is the leader ?
>
> Regards,
> Amit Bhatia
>
>
> On Wed, Jan 20, 2021 at 9:44 AM Chirag Dewan <chirag.dewa...@yahoo.in>
> wrote:
>
>> Hi,
>>
>> Can we have multiple replicas with ZK HA in K8 as well?
>> In this case, how does Task Managers and clients recover the Job Manager
>> RPC address? Are they updated in ZK?
>> Also, since there are 3 replicas behind the same service endpoint and
>> only one of them is the leader, how should clients reach the leader Job
>> Manager?
>>
>> On Wednesday, 20 January, 2021, 07:41:20 am IST, Yang Wang <
>> danrtsey...@gmail.com> wrote:
>>
>>
>> If you do not want to run multiple JobManagers simultaneously, then I
>> think the "Job" for application cluster
>> with HA enable is enough.
>> K8s will also launch a new pod/container when the old one terminated
>> exceptionally.
>>
>> Best,
>> Yang
>>
>> Yang Wang <danrtsey...@gmail.com> 于2021年1月20日周三 上午10:08写道:
>>
>> Yes. Using a "Deployment" instead of "Job" for the application cluster
>> also makes sense.
>> Actually, in the native K8s integration, we always use the deployment for
>> JobManager.
>>
>> But please note that the deployment may relaunch the JobManager pod even
>> though you cancel
>> the Flink job.
>>
>> Best,
>> Yang
>>
>> Ashish Nigam <ashnigamt...@gmail.com> 于2021年1月20日周三 上午5:29写道:
>>
>> Yang,
>> For Application clusters, does it make sense to deploy JobManager as
>> "Deployment" rather than as a "Job", as suggested in docs?
>> I am asking this because I am thinking of deploying a job manager in HA
>> mode even for application clusters.
>>
>> Thanks
>> Ashish
>>
>>
>> On Tue, Jan 19, 2021 at 6:16 AM Yang Wang <danrtsey...@gmail.com> wrote:
>>
>> Usually, you do not need to start multiple JobManager simultaneously. The
>> JobManager is a deployment.
>> A new one pod/container will be launched once it terminated
>> exceptionally.
>>
>> If you still want to start multiple JobManagers to get a faster recovery,
>> you could set the replica greater than 1
>> for standalone cluster on K8s[1]. For native integration[2], we still
>> have not supported such configuration[2].
>>
>> Please note that the key point to enable HA is not start multiple
>> JobManagers simultaneously or sequently.
>> You need to set the ZooKeeperHAService[4] or KubernetesHAService[5] to
>> ensure the Flink job could recover
>> from latest successful checkpoint.
>>
>> [1].
>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/standalone/kubernetes.html#session-cluster-resource-definitions
>> [2].
>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/native_kubernetes.html
>> [3]. https://issues.apache.org/jira/browse/FLINK-17707
>> [4].
>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/ha/zookeeper_ha.html
>> [5].
>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/ha/kubernetes_ha.html
>>
>> Best,
>> Yang
>>
>> Amit Bhatia <bhatia.amit1...@gmail.com> 于2021年1月19日周二 下午8:45写道:
>>
>> Hi,
>>
>> I am deploying Flink 1.12 on K8s. Can anyone confirm if we can deploy
>> multiple job manager pods in K8s for HA or it should always be only a
>> single job manager pod ?
>>
>> Regards,
>> Amit Bhatia
>>
>>

Reply via email to