IIUC, "state.checkpoints.dir" is specifying an external checkpoint path, which will not be cleaned up unless the users configured it explicitly[1].
However, for "high-availability.storageDir", it will be cleaned up automatically when all the jobs in the application reaches to the terminal state. Moreover, not only the checkpoints, but also the generated job graphs, user jars/artifacts are stored in this storage. You could check the content of this directory. [1]. https://ci.apache.org/projects/flink/flink-docs-master/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention Best, Yang Boris Lublinsky <boris.lublin...@lightbend.com> 于2020年12月21日周一 上午10:18写道: > I understand this. > State storage Is defined defined by state.checkpointing.dir, for example > > state.checkpoints.dir: file:///mnt/flink/storage/checkpoints > > > I am talking about reference defined in 2 places > > > On Dec 20, 2020, at 8:05 PM, Yang Wang <danrtsey...@gmail.com> wrote: > > I am afraid only the state handle is stored in the ConfigMap. The real > state data is stored in > the distributed storage configured via "high-availability.storageDir". I > believe you could find > more information in this class KubernetesStateHandleStore[1]. > > How could you find that the checkpointing information is stored twice? It > should not happen. > > [1]. > https://github.com/apache/flink/blob/5f7e0dc96547fdb2f82f903ee48bf43b47ca4ae0/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java#L53 > > > Best, > Yang > > Boris Lublinsky <boris.lublin...@lightbend.com> 于2020年12月20日周日 上午12:49写道: > >> Thanks Yang, >> This is still confusing. >> I did more experiments and see that checkpointing information is stored >> twice - in config map and in high-availability.storageDir >> Do we need this duplication? >> Do we need to specify high-availability.storageDir as defined in >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration >> Or just specifying >> >> high-availability: >> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >> >> Is sufficient? >> >> >> >> On Dec 17, 2020, at 10:09 PM, Yang Wang <danrtsey...@gmail.com> wrote: >> >> The latest successful checkpoint pointer is stored in the ConfigMap, as >> well as the JobGraph pointer. >> They could help us recover the running jobs before you delete the K8s >> deployment. If the HA ConfigMaps >> are deleted, then when you create a Flink cluster with the same >> cluster-id, it could not recover from the latest >> successful checkpoint automatically. >> >> Best, >> Yang >> >> >> >> >> Boris Lublinsky <boris.lublin...@lightbend.com> 于2020年12月18日周五 上午11:42写道: >> >>> Also re reading >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up >>> >>> This does not seem right: >>> To keep HA data while restarting the Flink cluster, simply delete the >>> deployment (via kubectl delete deploy <cluster-id>). All the Flink >>> cluster related resources will be deleted (e.g. JobManager Deployment, >>> TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps >>> will be retained because they do not set the owner reference. When >>> restarting the cluster, all previously running jobs will be recovered and >>> restarted from the latest successful checkpoint. >>> >>> Last successful checkpoint is not in the config maps, but rather on >>> persistent volume. Config map can be safely deleted. If you restart JM, it >>> will create a new leader anyways., So I would suggest to add owner >>> reference there >>> >>> >>> On Dec 17, 2020, at 8:49 PM, Yang Wang <danrtsey...@gmail.com> wrote: >>> >>> Hi Boris, >>> >>> Thanks for your follow up response and trying the new >>> KubernetesHAService. >>> >>> 1. It is a valid bug. We are not setting the service account for >>> TaskManager pod. Before the KubernetesHAService is introduced, it works >>> fine because the TaskManager does not need to access the K8s resource(e.g. >>> ConfigMap) directly. I have created a ticket[1] to support setting service >>> account for TaskManager. >>> 2. If you directly delete the JobManager deployment, then the HA related >>> ConfigMap will be retained. It is a by-design behavior. Because the job >>> does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this >>> cluster could recover in the future. If all the jobs in the application >>> reach to the terminal state, all the HA related ConfigMaps will be cleaned >>> up automatically. You could cancel the job and verify that. Refer here[2] >>> for more information. >>> >>> For the PVC based storage, if it could support multiple read-write then >>> the KubernetesHAService should work. Actually, it feels like a distributed >>> storage. >>> >>> [1]. https://issues.apache.org/jira/browse/FLINK-20664 >>> [2]. >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html >>> >>> Best, >>> Yang >>> >>> Boris Lublinsky <boris.lublin...@lightbend.com> 于2020年12月18日周五 上午7:16写道: >>> >>>> And K8 native HA works, >>>> But there are 2 bugs in this implementation. >>>> >>>> 1. Task manager pods are running as default user account, which fails >>>> because it does not have access to config maps to get endpoint’s >>>> information. I had to add permissions to default service account to make it >>>> work. Ideally both JM and TM pods should run under the same service >>>> account. >>>> 2. When a Flink application is deleted, it clears the main config map, >>>> but not the ones used for leader election >>>> >>>> >>>> And finally it works fine with PVC based storage, as long as it is >>>> read-write many >>>> >>>> >>>> On Dec 15, 2020, at 8:40 PM, Yang Wang <danrtsey...@gmail.com> wrote: >>>> >>>> Hi Boris, >>>> >>>> What is -p 10? >>>> >>>> It is same to --parallelism 10. Set the default parallelism to 10. >>>> >>>> does it require a special container build? >>>> >>>> No, the official flink docker image could be used >>>> directly. Unfortunately, we do not have the image now. And we are trying to >>>> figure out. >>>> You could follow the instructions below to have your own image. >>>> >>>> >>>> git clone https://github.com/apache/flink-docker.git >>>> git checkout dev-master./add-custom.sh -u >>>> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz >>>> -n flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t >>>> flink:flink-1.12.0docker push flink:flink-1.12.0 >>>> >>>> >>>> This is if I use HDFS for save pointing, right? I can instead use PVC - >>>>> based save pointing, correct? >>>> >>>> It is an example to storing the HA related data to OSS(Alibaba Cloud >>>> Object Storage, similar to S3). Since we require a distributed storage, I >>>> am afraid you could not use a PVC here. Instead, you could using a minio. >>>> >>>> Can I control the amount of standby JMs? >>>> >>>> Currently, you could not control the number of JobManagers. This is >>>> only because we have not introduce a config option for it. But you could do >>>> it manually via `kubectl edit deploy <clusterID>`. It should also work. >>>> >>>> Finally, what is behavior on the rolling restart of JM deployment? >>>> >>>> Once a JobManager terminated, it will lose the leadership and a standby >>>> one will take over. So on the rolling restart of JM deployment, you will >>>> find that the leader switches multiple times and your job also restarts >>>> multiple times. I am not sure why you need to roll the JobManager >>>> deployment. We are using deployment for JobManager in Flink just because we >>>> want the JobManager to be launched once it crashed. Another reason for >>>> multiple JobManagers is to get a faster recovery. >>>> >>>> >>>> Best, >>>> Yang >>>> >>>> >>>> Boris Lublinsky <boris.lublin...@lightbend.com> 于2020年12月16日周三 >>>> 上午9:09写道: >>>> >>>>> Thanks Chesney for your quick response, >>>>> I read documentation >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s >>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-144:+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s> >>>>> More carefully and found the sample, I was looking for: >>>>> >>>>> ./bin/flink run-application -p 10 -t kubernetes-application >>>>> -Dkubernetes.cluster-id=k8s-ha-app1 \ >>>>> -Dkubernetes.container.image=flink:k8s-ha \ >>>>> -Dkubernetes.container.image.pull-policy=Always \ >>>>> -Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \ >>>>> -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 >>>>> -Dtaskmanager.numberOfTaskSlots=4 \ >>>>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>>>> \ >>>>> -Dhigh-availability.storageDir=oss://flink/flink-ha \ >>>>> -Drestart-strategy=fixed-delay >>>>> -Drestart-strategy.fixed-delay.attempts=10 \ >>>>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar >>>>> \ >>>>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar >>>>> \ >>>>> local:///opt/flink/examples/streaming/StateMachineExample.jar >>>>> >>>>> A couple of questions about it: >>>>> >>>>> ./bin/flink run-application -p 10 -t used to be ./bin/flink >>>>> run-application >>>>> -t. What is -p 10? >>>>> -Dkubernetes.container.image=flink:k8s-ha does it require a special >>>>> container build? >>>>> >>>>> -Dhigh-availability.storageDir=oss://flink/flink-ha \ >>>>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar >>>>> \ >>>>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar >>>>> \ >>>>> >>>>> This is if I use HDFS for save pointing, right? I can instead use PVC >>>>> - based save pointing, correct? >>>>> >>>>> Also I was trying to understand, how it works, and from the >>>>> documentation it sounds like there is one active and one or >>>>> more standby JMs. Can I control the amount of standby JMs? >>>>> >>>>> Finally, what is behavior on the rolling restart of JM deployment? >>>>> >>>>> >>>>> >>>>> >>>>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <ches...@apache.org> >>>>> wrote: >>>>> >>>>> Unfortunately no; there are some discussions going on in the >>>>> docker-library/official-images >>>>> PR <https://github.com/docker-library/official-images/pull/9249> that >>>>> have to be resolved first, but currently these would require changes on >>>>> the >>>>> Flink side that we cannot do (because it is already released!). We are not >>>>> sure yet whether we can get the PR accepted and defer further changes to >>>>> 1.12.1 . >>>>> >>>>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote: >>>>> >>>>> Thanks. >>>>> Do you have ETA for docker images? >>>>> >>>>> >>>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <ches...@apache.org> >>>>> wrote: >>>>> >>>>> 1) It is compiled with Java 8 but runs on Java 8 & 11. >>>>> 2) Docker images are not yet published. >>>>> 3) It is mentioned at the top of the Kubernetes HA Services >>>>> documentation that it also works for the native Kubernetes integration. >>>>> >>>>> *Kubernetes high availability services can only be used when deploying >>>>> to Kubernetes. Consequently, they can be configured when using >>>>> **standalone >>>>> Flink on Kubernetes >>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html>** >>>>> or the * >>>>> *native Kubernetes integration >>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html> >>>>> * >>>>> >>>>> From what I understand you only need to configure the 3 listed >>>>> options; the documentation also contains an example configuration >>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration> >>>>> . >>>>> >>>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote: >>>>> >>>>> It is great that Flink 1.12 is out. Several questions: >>>>> >>>>> 1. Is official Flink 1.12 distribution >>>>> https://flink.apache.org/downloads.html specifies Scala versions, but >>>>> not Java versions. Is it Java 8? >>>>> 2. I do not see any 1.12 docker images here >>>>> https://hub.docker.com/_/flink. Are they somewhere else? >>>>> 3 Flink 1.12 introduces Kubernetes HA support >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, >>>>> but Flink native Kubernetes support >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html >>>>> has >>>>> no mentioning of HA. Are the 2 integrated? DO you have any examples of >>>>> starting HA cluster using Flink native Kubernetes? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >