Thanks Yang, This is still confusing. I did more experiments and see that checkpointing information is stored twice - in config map and in high-availability.storageDir Do we need this duplication? Do we need to specify high-availability.storageDir as defined in https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration> Or just specifying
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory Is sufficient? > On Dec 17, 2020, at 10:09 PM, Yang Wang <danrtsey...@gmail.com> wrote: > > The latest successful checkpoint pointer is stored in the ConfigMap, as well > as the JobGraph pointer. > They could help us recover the running jobs before you delete the K8s > deployment. If the HA ConfigMaps > are deleted, then when you create a Flink cluster with the same cluster-id, > it could not recover from the latest > successful checkpoint automatically. > > Best, > Yang > > > > > Boris Lublinsky <boris.lublin...@lightbend.com > <mailto:boris.lublin...@lightbend.com>> 于2020年12月18日周五 上午11:42写道: > Also re reading > https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up > > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up> > > This does not seem right: > To keep HA data while restarting the Flink cluster, simply delete the > deployment (via kubectl delete deploy <cluster-id>). All the Flink cluster > related resources will be deleted (e.g. JobManager Deployment, TaskManager > pods, services, Flink conf ConfigMap). HA related ConfigMaps will be retained > because they do not set the owner reference. When restarting the cluster, all > previously running jobs will be recovered and restarted from the latest > successful checkpoint. > > Last successful checkpoint is not in the config maps, but rather on > persistent volume. Config map can be safely deleted. If you restart JM, it > will create a new leader anyways., So I would suggest to add owner reference > there > > >> On Dec 17, 2020, at 8:49 PM, Yang Wang <danrtsey...@gmail.com >> <mailto:danrtsey...@gmail.com>> wrote: >> >> Hi Boris, >> >> Thanks for your follow up response and trying the new KubernetesHAService. >> >> 1. It is a valid bug. We are not setting the service account for TaskManager >> pod. Before the KubernetesHAService is introduced, it works fine because the >> TaskManager does not need to access the K8s resource(e.g. ConfigMap) >> directly. I have created a ticket[1] to support setting service account for >> TaskManager. >> 2. If you directly delete the JobManager deployment, then the HA related >> ConfigMap will be retained. It is a by-design behavior. Because the job does >> not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this cluster >> could recover in the future. If all the jobs in the application reach to the >> terminal state, all the HA related ConfigMaps will be cleaned up >> automatically. You could cancel the job and verify that. Refer here[2] for >> more information. >> >> For the PVC based storage, if it could support multiple read-write then the >> KubernetesHAService should work. Actually, it feels like a distributed >> storage. >> >> [1]. https://issues.apache.org/jira/browse/FLINK-20664 >> <https://issues.apache.org/jira/browse/FLINK-20664> >> [2]. >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html> >> >> Best, >> Yang >> >> Boris Lublinsky <boris.lublin...@lightbend.com >> <mailto:boris.lublin...@lightbend.com>> 于2020年12月18日周五 上午7:16写道: >> And K8 native HA works, >> But there are 2 bugs in this implementation. >> >> 1. Task manager pods are running as default user account, which fails >> because it does not have access to config maps to get endpoint’s >> information. I had to add permissions to default service account to make it >> work. Ideally both JM and TM pods should run under the same service account. >> 2. When a Flink application is deleted, it clears the main config map, but >> not the ones used for leader election >> >> >> And finally it works fine with PVC based storage, as long as it is >> read-write many >> >> >>> On Dec 15, 2020, at 8:40 PM, Yang Wang <danrtsey...@gmail.com >>> <mailto:danrtsey...@gmail.com>> wrote: >>> >>> Hi Boris, >>> >>> What is -p 10? >>> It is same to --parallelism 10. Set the default parallelism to 10. >>> >>> does it require a special container build? >>> No, the official flink docker image could be used directly. Unfortunately, >>> we do not have the image now. And we are trying to figure out. >>> You could follow the instructions below to have your own image. >>> >>> >>> git clone https://github.com/apache/flink-docker.git >>> <https://github.com/apache/flink-docker.git> >>> >>> git checkout dev-master >>> >>> ./add-custom.sh -u >>> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz >>> >>> <https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz> >>> -n flink-1.12.0 >>> >>> cd dev/flink-1.12.0-debian >>> docker build . -t flink:flink-1.12.0 >>> docker push flink:flink-1.12.0 >>> >>> This is if I use HDFS for save pointing, right? I can instead use PVC - >>> based save pointing, correct? >>> It is an example to storing the HA related data to OSS(Alibaba Cloud Object >>> Storage, similar to S3). Since we require a distributed storage, I am >>> afraid you could not use a PVC here. Instead, you could using a minio. >>> >>> Can I control the amount of standby JMs? >>> Currently, you could not control the number of JobManagers. This is only >>> because we have not introduce a config option for it. But you could do it >>> manually via `kubectl edit deploy <clusterID>`. It should also work. >>> >>> Finally, what is behavior on the rolling restart of JM deployment? >>> Once a JobManager terminated, it will lose the leadership and a standby one >>> will take over. So on the rolling restart of JM deployment, you will find >>> that the leader switches multiple times and your job also restarts multiple >>> times. I am not sure why you need to roll the JobManager deployment. We are >>> using deployment for JobManager in Flink just because we want the >>> JobManager to be launched once it crashed. Another reason for multiple >>> JobManagers is to get a faster recovery. >>> >>> >>> Best, >>> Yang >>> >>> >>> Boris Lublinsky <boris.lublin...@lightbend.com >>> <mailto:boris.lublin...@lightbend.com>> 于2020年12月16日周三 上午9:09写道: >>> Thanks Chesney for your quick response, >>> I read documentation >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s >>> >>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-144:+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s> >>> More carefully and found the sample, I was looking for: >>> >>> ./bin/flink run-application -p 10 -t kubernetes-application >>> -Dkubernetes.cluster-id=k8s-ha-app1 \ >>> -Dkubernetes.container.image=flink:k8s-ha \ >>> -Dkubernetes.container.image.pull-policy=Always \ >>> -Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \ >>> -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 >>> -Dtaskmanager.numberOfTaskSlots=4 \ >>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>> \ >>> -Dhigh-availability.storageDir=oss://flink/flink-ha \ >>> -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \ >>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar >>> \ >>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar >>> \ >>> local:///opt/flink/examples/streaming/StateMachineExample.jar <> >>> >>> A couple of questions about it: >>> >>> ./bin/flink run-application -p 10 -t used to be ./bin/flink run-application >>> -t. What is -p 10? >>> -Dkubernetes.container.image=flink:k8s-ha does it require a special >>> container build? >>> >>> -Dhigh-availability.storageDir=oss://flink/flink-ha \ >>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar >>> \ >>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar >>> \ >>> >>> This is if I use HDFS for save pointing, right? I can instead use PVC - >>> based save pointing, correct? >>> >>> Also I was trying to understand, how it works, and from the documentation >>> it sounds like there is one active and one or >>> more standby JMs. Can I control the amount of standby JMs? >>> >>> Finally, what is behavior on the rolling restart of JM deployment? >>> >>> >>> >>> >>>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <ches...@apache.org >>>> <mailto:ches...@apache.org>> wrote: >>>> >>>> Unfortunately no; there are some discussions going on in the >>>> docker-library/official-images PR >>>> <https://github.com/docker-library/official-images/pull/9249> that have to >>>> be resolved first, but currently these would require changes on the Flink >>>> side that we cannot do (because it is already released!). We are not sure >>>> yet whether we can get the PR accepted and defer further changes to 1.12.1 >>>> . >>>> >>>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote: >>>>> Thanks. >>>>> Do you have ETA for docker images? >>>>> >>>>> >>>>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <ches...@apache.org >>>>>> <mailto:ches...@apache.org>> wrote: >>>>>> >>>>>> 1) It is compiled with Java 8 but runs on Java 8 & 11. >>>>>> 2) Docker images are not yet published. >>>>>> 3) It is mentioned at the top of the Kubernetes HA Services >>>>>> documentation that it also works for the native Kubernetes integration. >>>>>> Kubernetes high availability services can only be used when deploying to >>>>>> Kubernetes. Consequently, they can be configured when using standalone >>>>>> Flink on Kubernetes >>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html> >>>>>> or the native Kubernetes integration >>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html> >>>>>> From what I understand you only need to configure the 3 listed options; >>>>>> the documentation also contains an example configuration >>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration>. >>>>>> >>>>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote: >>>>>>> It is great that Flink 1.12 is out. Several questions: >>>>>>> >>>>>>> 1. Is official Flink 1.12 distribution >>>>>>> https://flink.apache.org/downloads.html >>>>>>> <https://flink.apache.org/downloads.html> specifies Scala versions, but >>>>>>> not Java versions. Is it Java 8? >>>>>>> 2. I do not see any 1.12 docker images here >>>>>>> https://hub.docker.com/_/flink <https://hub.docker.com/_/flink>. Are >>>>>>> they somewhere else? >>>>>>> 3 Flink 1.12 introduces Kubernetes HA support >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html >>>>>>> >>>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html>, >>>>>>> but Flink native Kubernetes support >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html >>>>>>> >>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html> >>>>>>> has no mentioning of HA. Are the 2 integrated? DO you have any >>>>>>> examples of starting HA cluster using Flink native Kubernetes? >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >