IIUC, "state.checkpoints.dir" is specifying an external checkpoint path,
which will not be cleaned up unless
the users configured it explicitly[1].

However, for "high-availability.storageDir", it will be cleaned up
automatically when all the jobs in the application
reaches to the terminal state. Moreover, not only the checkpoints, but also
the generated job graphs, user jars/artifacts
are stored in this storage. You could check the content of this directory.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention

Best,
Yang

Boris Lublinsky <boris.lublin...@lightbend.com> 于2020年12月21日周一 上午10:18写道:

> I understand this.
> State storage Is defined defined by state.checkpointing.dir, for example
>
> state.checkpoints.dir: file:///mnt/flink/storage/checkpoints
>
>
> I am talking about reference defined in 2 places
>
>
> On Dec 20, 2020, at 8:05 PM, Yang Wang <danrtsey...@gmail.com> wrote:
>
> I am afraid only the state handle is stored in the ConfigMap. The real
> state data is stored in
> the distributed storage configured via "high-availability.storageDir". I
> believe you could find
> more information in this class KubernetesStateHandleStore[1].
>
> How could you find that the checkpointing information is stored twice? It
> should not happen.
>
> [1].
> https://github.com/apache/flink/blob/5f7e0dc96547fdb2f82f903ee48bf43b47ca4ae0/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java#L53
>
>
> Best,
> Yang
>
> Boris Lublinsky <boris.lublin...@lightbend.com> 于2020年12月20日周日 上午12:49写道:
>
>> Thanks Yang,
>> This is still confusing.
>> I did more experiments and see that checkpointing information is stored
>> twice - in config map and in high-availability.storageDir
>> Do we need this duplication?
>> Do we need to specify high-availability.storageDir as defined in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration
>> Or just specifying
>>
>> high-availability: 
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>
>> Is sufficient?
>>
>>
>>
>> On Dec 17, 2020, at 10:09 PM, Yang Wang <danrtsey...@gmail.com> wrote:
>>
>> The latest successful checkpoint pointer is stored in the ConfigMap, as
>> well as the JobGraph pointer.
>> They could help us recover the running jobs before you delete the K8s
>> deployment. If the HA ConfigMaps
>> are deleted, then when you create a Flink cluster with the same
>> cluster-id, it could not recover from the latest
>> successful checkpoint automatically.
>>
>> Best,
>> Yang
>>
>>
>>
>>
>> Boris Lublinsky <boris.lublin...@lightbend.com> 于2020年12月18日周五 上午11:42写道:
>>
>>> Also re reading
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up
>>>
>>> This does not seem right:
>>> To keep HA data while restarting the Flink cluster, simply delete the
>>> deployment (via kubectl delete deploy <cluster-id>). All the Flink
>>> cluster related resources will be deleted (e.g. JobManager Deployment,
>>> TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps
>>> will be retained because they do not set the owner reference. When
>>> restarting the cluster, all previously running jobs will be recovered and
>>> restarted from the latest successful checkpoint.
>>>
>>> Last successful checkpoint is not in the config maps, but rather on
>>> persistent volume. Config map can be safely deleted. If you restart JM, it
>>> will create a new leader anyways., So I would suggest to add owner
>>> reference there
>>>
>>>
>>> On Dec 17, 2020, at 8:49 PM, Yang Wang <danrtsey...@gmail.com> wrote:
>>>
>>> Hi Boris,
>>>
>>> Thanks for your follow up response and trying the new
>>> KubernetesHAService.
>>>
>>> 1. It is a valid bug. We are not setting the service account for
>>> TaskManager pod. Before the KubernetesHAService is introduced, it works
>>> fine because the TaskManager does not need to access the K8s resource(e.g.
>>> ConfigMap) directly. I have created a ticket[1] to support setting service
>>> account for TaskManager.
>>> 2. If you directly delete the JobManager deployment, then the HA related
>>> ConfigMap will be retained. It is a by-design behavior. Because the job
>>> does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this
>>> cluster could recover in the future. If all the jobs in the application
>>> reach to the terminal state, all the HA related ConfigMaps will be cleaned
>>> up automatically. You could cancel the job and verify that. Refer here[2]
>>> for more information.
>>>
>>> For the PVC based storage, if it could support multiple read-write then
>>> the KubernetesHAService should work. Actually, it feels like a distributed
>>> storage.
>>>
>>> [1]. https://issues.apache.org/jira/browse/FLINK-20664
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html
>>>
>>> Best,
>>> Yang
>>>
>>> Boris Lublinsky <boris.lublin...@lightbend.com> 于2020年12月18日周五 上午7:16写道:
>>>
>>>> And K8 native HA works,
>>>> But there are 2 bugs in this implementation.
>>>>
>>>> 1. Task manager pods are running as default user account, which fails
>>>> because it does not have access to config maps to get endpoint’s
>>>> information. I had to add permissions to default service account to make it
>>>> work. Ideally both JM and TM pods should run under the same service
>>>> account.
>>>> 2. When a Flink application is deleted, it clears the main config map,
>>>> but not the ones used for leader election
>>>>
>>>>
>>>> And finally it works fine with PVC based storage, as long as it is
>>>> read-write many
>>>>
>>>>
>>>> On Dec 15, 2020, at 8:40 PM, Yang Wang <danrtsey...@gmail.com> wrote:
>>>>
>>>> Hi Boris,
>>>>
>>>> What is -p 10?
>>>>
>>>> It is same to --parallelism 10. Set the default parallelism to 10.
>>>>
>>>> does it require a special container build?
>>>>
>>>> No, the official flink docker image could be used
>>>> directly. Unfortunately, we do not have the image now. And we are trying to
>>>> figure out.
>>>> You could follow the instructions below to have your own image.
>>>>
>>>>
>>>> git clone https://github.com/apache/flink-docker.git
>>>> git checkout dev-master./add-custom.sh -u 
>>>> https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
>>>>  -n flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t 
>>>> flink:flink-1.12.0docker push flink:flink-1.12.0
>>>>
>>>>
>>>> This is if I use HDFS for save pointing, right? I can instead use PVC -
>>>>> based save pointing, correct?
>>>>
>>>> It is an example to storing the HA related data to OSS(Alibaba Cloud
>>>> Object Storage, similar to S3). Since we require a distributed storage, I
>>>> am afraid you could not use a PVC here. Instead, you could using a minio.
>>>>
>>>> Can I control the amount of standby JMs?
>>>>
>>>> Currently, you could not control the number of JobManagers. This is
>>>> only because we have not introduce a config option for it. But you could do
>>>> it manually via `kubectl edit deploy <clusterID>`. It should also work.
>>>>
>>>> Finally, what is behavior on the rolling restart of JM deployment?
>>>>
>>>> Once a JobManager terminated, it will lose the leadership and a standby
>>>> one will take over. So on the rolling restart of JM deployment, you will
>>>> find that the leader switches multiple times and your job also restarts
>>>> multiple times. I am not sure why you need to roll the JobManager
>>>> deployment. We are using deployment for JobManager in Flink just because we
>>>> want the JobManager to be launched once it crashed. Another reason for
>>>> multiple JobManagers is to get a faster recovery.
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>>
>>>> Boris Lublinsky <boris.lublin...@lightbend.com> 于2020年12月16日周三
>>>> 上午9:09写道:
>>>>
>>>>> Thanks Chesney for your quick response,
>>>>> I read documentation
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s
>>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-144:+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s>
>>>>> More carefully and found the sample, I was looking for:
>>>>>
>>>>> ./bin/flink run-application -p 10 -t kubernetes-application
>>>>> -Dkubernetes.cluster-id=k8s-ha-app1 \
>>>>> -Dkubernetes.container.image=flink:k8s-ha \
>>>>> -Dkubernetes.container.image.pull-policy=Always \
>>>>> -Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
>>>>> -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2
>>>>> -Dtaskmanager.numberOfTaskSlots=4 \
>>>>> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>> \
>>>>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>>>>> -Drestart-strategy=fixed-delay
>>>>> -Drestart-strategy.fixed-delay.attempts=10 \
>>>>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>> \
>>>>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>> \
>>>>> local:///opt/flink/examples/streaming/StateMachineExample.jar
>>>>>
>>>>> A couple of questions about it:
>>>>>
>>>>> ./bin/flink run-application -p 10 -t used to be ./bin/flink 
>>>>> run-application
>>>>> -t. What is -p 10?
>>>>> -Dkubernetes.container.image=flink:k8s-ha does it require a special
>>>>> container build?
>>>>>
>>>>> -Dhigh-availability.storageDir=oss://flink/flink-ha \
>>>>> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>> \
>>>>> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
>>>>> \
>>>>>
>>>>> This is if I use HDFS for save pointing, right? I can instead use PVC
>>>>> - based save pointing, correct?
>>>>>
>>>>> Also I was trying to understand, how it works, and from the
>>>>> documentation it sounds like there is one active and one or
>>>>> more standby JMs. Can I control the amount of standby JMs?
>>>>>
>>>>> Finally, what is behavior on the rolling restart of JM deployment?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler <ches...@apache.org>
>>>>> wrote:
>>>>>
>>>>> Unfortunately no; there are some discussions going on in the 
>>>>> docker-library/official-images
>>>>> PR <https://github.com/docker-library/official-images/pull/9249> that
>>>>> have to be resolved first, but currently these would require changes on 
>>>>> the
>>>>> Flink side that we cannot do (because it is already released!). We are not
>>>>> sure yet whether we can get the PR accepted and defer further changes to
>>>>> 1.12.1 .
>>>>>
>>>>> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>>>>>
>>>>> Thanks.
>>>>> Do you have ETA for docker images?
>>>>>
>>>>>
>>>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler <ches...@apache.org>
>>>>> wrote:
>>>>>
>>>>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>>>>> 2) Docker images are not yet published.
>>>>> 3) It is mentioned at the top of the Kubernetes HA Services
>>>>> documentation that it also works for the native Kubernetes integration.
>>>>>
>>>>> *Kubernetes high availability services can only be used when deploying
>>>>> to Kubernetes. Consequently, they can be configured when using 
>>>>> **standalone
>>>>> Flink on Kubernetes
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html>**
>>>>> or the *
>>>>> *native Kubernetes integration
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html>
>>>>> *
>>>>>
>>>>> From what I understand you only need to configure the 3 listed
>>>>> options; the documentation also contains an example configuration
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration>
>>>>> .
>>>>>
>>>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
>>>>>
>>>>> It is great that Flink 1.12 is out. Several questions:
>>>>>
>>>>> 1. Is official Flink 1.12 distribution
>>>>> https://flink.apache.org/downloads.html specifies Scala versions, but
>>>>> not Java versions. Is it Java 8?
>>>>> 2. I do not see any 1.12 docker images here
>>>>> https://hub.docker.com/_/flink. Are they somewhere else?
>>>>> 3 Flink 1.12 introduces Kubernetes HA support
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html,
>>>>> but Flink native Kubernetes support
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
>>>>>  has
>>>>> no mentioning of HA. Are the 2 integrated? DO you have any examples of
>>>>> starting HA cluster using Flink native Kubernetes?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to