3. We could avoid force deletions from within Flink. If the user does it, then we don't give guarantees.
I am fine with your current proposal. +1 for moving forward with it. Cheers, Till On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <danrtsey...@gmail.com> wrote: > 2. Yes. This is exactly what I mean. Storing the HA information relevant > to a specific component in a single ConfigMap and ensuring that “Get(check > the leader)-and-Update(write back to the ConfigMap)” is a transactional > operation. Since we only store the job graph stateHandler(not the real > data) in the ConfigMap, I think 1MB is big enough for the dispater-leader > ConfigMap(the biggest one with multiple jobs). I roughly calculate that > could we have more than 1000 Flink jobs in a Flink session cluster. > > 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet > could provide at most one semantics if no manually force-deletion > happened[1]. Based on the previous discussion, we have successfully avoided > the "lock-and-release" in the implementation. So I still insist on using > the current Deployment. > > > [1]. > https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion > > > Best, > Yang > > Till Rohrmann <trohrm...@apache.org> 于2020年9月30日周三 下午11:57写道: > >> Thanks for the clarifications Yang Wang. >> >> 2. Keeping the HA information relevant for a component (Dispatcher, >> JobManager, ResourceManager) in a single ConfigMap sounds good. We should >> check that we don't exceed the 1 MB size limit with this approach though. >> The Dispatcher's ConfigMap would then contain the current leader, the >> running jobs and the pointers to the persisted JobGraphs. The JobManager's >> ConfigMap would then contain the current leader, the pointers to the >> checkpoints and the checkpoint ID counter, for example. >> >> 3. Ah ok, I somehow thought that K8s would give us stronger >> guarantees than Yarn in this regard. That's a pity. >> >> Cheers, >> Till >> >> On Wed, Sep 30, 2020 at 10:03 AM tison <wander4...@gmail.com> wrote: >> >>> Thanks for your explanation. It would be fine if only checking >>> leadership & actually write information is atomic. >>> >>> Best, >>> tison. >>> >>> >>> Yang Wang <danrtsey...@gmail.com> 于2020年9月30日周三 下午3:57写道: >>> >>>> Thanks till and tison for your comments. >>>> >>>> @Till Rohrmann <trohrm...@apache.org> >>>> 1. I am afraid we could not do this if we are going to use fabric8 >>>> Kubernetes client SDK for the leader election. The official Kubernetes Java >>>> client[1] also could not support it. Unless we implement a new >>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems >>>> that we could gain too much from this. >>>> >>>> 2. Yes, the implementation will be a little complicated if we want to >>>> completely eliminate the residual job graphs or checkpoints. Inspired by >>>> your suggestion, another different solution has come into my mind. We could >>>> use a same ConfigMap storing the JobManager leader, job graph, >>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for >>>> the HA meta storage. Then it will be easier to guarantee that only the >>>> leader could write the ConfigMap in a transactional operation. Since >>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a >>>> transactional operation. >>>> >>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. >>>> However, we still have the chances that two JobManager are running and >>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that >>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager >>>> could not be deleted. A new JobManager pod will be launched. We are just in >>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only >>>> benefit is we do not need to implement a leader election/retrieval service. >>>> >>>> @tison >>>> Actually, I do not think we will have such issue in the Kubernetes HA >>>> service. In the Kubernetes LeaderElector[2], we have the leader information >>>> stored on the annotation of leader ConfigMap. So it would not happen the >>>> old leader could wrongly override the leader information. Once a JobManager >>>> want to write his leader information to the ConfigMap, it will check >>>> whether it is the leader now. If not, anything will happen. Moreover, the >>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and >>>> written a different update while the client was in the process of >>>> performing its update. >>>> >>>> >>>> [1]. >>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java >>>> [2]. >>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java >>>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70> >>>> [3]. >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion >>>> >>>> >>>> Best, >>>> Yang >>>> >>>> tison <wander4...@gmail.com> 于2020年9月30日周三 下午3:21写道: >>>> >>>>> Hi, >>>>> >>>>> Generally +1 for a native k8s HA service. >>>>> >>>>> For leader election & publish leader information, there was a >>>>> discussion[1] >>>>> pointed out that since these two actions is NOT atomic, there will be >>>>> always >>>>> edge case where a previous leader overwrite leader information, even >>>>> with >>>>> versioned write. Versioned write helps on read again if version >>>>> mismatches >>>>> so if we want version write works, information in the kv pair should >>>>> help the >>>>> contender reflects whether it is the current leader. >>>>> >>>>> The idea of writes leader information on contender node or something >>>>> equivalent makes sense but the details depends on how it is >>>>> implemented. >>>>> General problems are that >>>>> >>>>> 1. TM might be a bit late before it updated correct leader information >>>>> but >>>>> only if the leader election process is short and leadership is stable >>>>> at most >>>>> time, it won't be a serious issue. >>>>> 2. The process TM extract leader information might be a bit more >>>>> complex >>>>> than directly watching a fixed key. >>>>> >>>>> Atomic issue can be addressed if one leverages low APIs such as lease >>>>> & txn >>>>> but it causes more developing efforts. ConfigMap and encapsulated >>>>> interface, >>>>> thought, provides only a self-consistent mechanism which doesn't >>>>> promise >>>>> more consistency for extension. >>>>> >>>>> Best, >>>>> tison. >>>>> >>>>> [1] >>>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E >>>>> >>>>> >>>>> >>>>> Till Rohrmann <trohrm...@apache.org> 于2020年9月29日周二 下午9:25写道: >>>>> >>>>>> For 1. I was wondering whether we can't write the leader connection >>>>>> information directly when trying to obtain the leadership (trying to >>>>>> update >>>>>> the leader key with one's own value)? This might be a little detail, >>>>>> though. >>>>>> >>>>>> 2. Alright, so we are having a similar mechanism as we have in >>>>>> ZooKeeper >>>>>> with the ephemeral lock nodes. I guess that this complicates the >>>>>> implementation a bit, unfortunately. >>>>>> >>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could >>>>>> configure a different persistent storage like HDFS or S3 for storing >>>>>> the >>>>>> checkpoints and job blobs like in the ZooKeeper case. The current >>>>>> benefit I >>>>>> see is that we avoid having to implement this multi locking mechanism >>>>>> in >>>>>> the ConfigMaps using the annotations because we can be sure that >>>>>> there is >>>>>> only a single leader at a time if I understood the guarantees of K8s >>>>>> correctly. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <danrtsey...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> > Hi Till, thanks for your valuable feedback. >>>>>> > >>>>>> > 1. Yes, leader election and storing leader information will use a >>>>>> same >>>>>> > ConfigMap. When a contender successfully performs a versioned >>>>>> annotation >>>>>> > update operation to the ConfigMap, it means that it has been >>>>>> elected as the >>>>>> > leader. And it will write the leader information in the callback of >>>>>> leader >>>>>> > elector[1]. The Kubernetes resource version will help us to avoid >>>>>> the >>>>>> > leader ConfigMap is wrongly updated. >>>>>> > >>>>>> > 2. The lock and release is really a valid concern. Actually in >>>>>> current >>>>>> > design, we could not guarantee that the node who tries to write his >>>>>> > ownership is the real leader. Who writes later, who is the owner. To >>>>>> > address this issue, we need to store all the owners of the key. >>>>>> Only when >>>>>> > the owner is empty, the specific key(means a checkpoint or job >>>>>> graph) could >>>>>> > be deleted. However, we may have a residual checkpoint or job graph >>>>>> when >>>>>> > the old JobManager crashed exceptionally and do not release the >>>>>> lock. To >>>>>> > solve this problem completely, we need a timestamp renew mechanism >>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us >>>>>> to the >>>>>> > check the JobManager timeout and then clean up the residual keys. >>>>>> > >>>>>> > 3. Frankly speaking, I am not against with this solution. However, >>>>>> in my >>>>>> > opinion, it is more like a temporary proposal. We could use >>>>>> StatefulSet to >>>>>> > avoid leader election and leader retrieval. But I am not sure >>>>>> whether >>>>>> > TaskManager could properly handle the situation that same hostname >>>>>> with >>>>>> > different IPs, because the JobManager failed and relaunched. Also >>>>>> we may >>>>>> > still have two JobManagers running in some corner cases(e.g. >>>>>> kubelet is >>>>>> > down but the pod is running). Another concern is we have a strong >>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. >>>>>> But it >>>>>> > is not always true especially in self-build Kubernetes cluster. >>>>>> Moreover, >>>>>> > PV provider should guarantee that each PV could only be mounted >>>>>> once. Since >>>>>> > the native HA proposal could cover all the functionality of >>>>>> StatefulSet >>>>>> > proposal, that's why I prefer the former. >>>>>> > >>>>>> > >>>>>> > [1]. >>>>>> > >>>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70 >>>>>> > >>>>>> > Best, >>>>>> > Yang >>>>>> > >>>>>> > Till Rohrmann <trohrm...@apache.org> 于2020年9月28日周一 下午9:29写道: >>>>>> > >>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of >>>>>> our users >>>>>> >> will like a ZooKeeper-less HA setup. >>>>>> >> >>>>>> >> +1 for not separating the leader information and the leader >>>>>> election if >>>>>> >> possible. Maybe it is even possible that the contender writes his >>>>>> leader >>>>>> >> information directly when trying to obtain the leadership by >>>>>> performing a >>>>>> >> versioned write operation. >>>>>> >> >>>>>> >> Concerning the lock and release operation I have a question: Can >>>>>> there be >>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not, >>>>>> how can >>>>>> >> we ensure that the node which writes his ownership is actually the >>>>>> leader >>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same >>>>>> problem >>>>>> >> (we should probably change it at some point to simply use a >>>>>> >> transaction which checks whether the writer is still the leader) >>>>>> and >>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is >>>>>> that >>>>>> >> there can be multiple owners of a given ZNode at a time. The last >>>>>> owner >>>>>> >> will then be responsible for the cleanup of the node. >>>>>> >> >>>>>> >> I see the benefit of your proposal over the stateful set proposal >>>>>> because >>>>>> >> it can support multiple standby JMs. Given the problem of locking >>>>>> key-value >>>>>> >> pairs it might be simpler to start with this approach where we >>>>>> only have >>>>>> >> single JM. This might already add a lot of benefits for our users. >>>>>> Was >>>>>> >> there a specific reason why you discarded this proposal (other than >>>>>> >> generality)? >>>>>> >> >>>>>> >> @Uce it would be great to hear your feedback on the proposal since >>>>>> you >>>>>> >> already implemented a K8s based HA service. >>>>>> >> >>>>>> >> Cheers, >>>>>> >> Till >>>>>> >> >>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <danrtsey...@gmail.com> >>>>>> wrote: >>>>>> >> >>>>>> >>> Hi Xintong and Stephan, >>>>>> >>> >>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the >>>>>> >>> comments inline. >>>>>> >>> >>>>>> >>> # Architecture -> One or two ConfigMaps >>>>>> >>> >>>>>> >>> Both of you are right. One ConfigMap will make the design and >>>>>> >>> implementation easier. Actually, in my POC codes, >>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for >>>>>> rest >>>>>> >>> server component) for the leader election >>>>>> >>> and storage. Once a JobManager win the election, it will update >>>>>> the >>>>>> >>> ConfigMap with leader address and periodically >>>>>> >>> renew the lock annotation to keep as the active leader. I will >>>>>> update >>>>>> >>> the FLIP document, including the architecture diagram, >>>>>> >>> to avoid the misunderstanding. >>>>>> >>> >>>>>> >>> >>>>>> >>> # HA storage > Lock and release >>>>>> >>> >>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it >>>>>> will be >>>>>> >>> deleted by the ZK server automatically when >>>>>> >>> the client is timeout. It could happen in a bad network >>>>>> environment or >>>>>> >>> the ZK client crashed exceptionally. For Kubernetes, >>>>>> >>> we need to implement a similar mechanism. First, when we want to >>>>>> lock a >>>>>> >>> specific key in ConfigMap, we will put the owner identify, >>>>>> >>> lease duration, renew time in the ConfigMap annotation. The >>>>>> annotation >>>>>> >>> will be cleaned up when releasing the lock. When >>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy >>>>>> the >>>>>> >>> following conditions. If not, the delete operation could not be >>>>>> done. >>>>>> >>> * Current instance is the owner of the key. >>>>>> >>> * The owner annotation is empty, which means the owner has >>>>>> released the >>>>>> >>> lock. >>>>>> >>> * The owner annotation timed out, which usually indicate the >>>>>> owner died. >>>>>> >>> >>>>>> >>> >>>>>> >>> # HA storage > HA data clean up >>>>>> >>> >>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is >>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1], >>>>>> >>> we set owner of the flink-conf configmap, service and TaskManager >>>>>> pods >>>>>> >>> to JobManager Deployment. So when we want to >>>>>> >>> destroy a Flink cluster, we just need to delete the >>>>>> deployment[2]. For >>>>>> >>> the HA related ConfigMaps, we do not set the owner >>>>>> >>> so that they could be retained even though we delete the whole >>>>>> Flink >>>>>> >>> cluster. >>>>>> >>> >>>>>> >>> >>>>>> >>> [1]. >>>>>> >>> >>>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/ >>>>>> >>> [2]. >>>>>> >>> >>>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session >>>>>> >>> >>>>>> >>> >>>>>> >>> Best, >>>>>> >>> Yang >>>>>> >>> >>>>>> >>> >>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道: >>>>>> >>> >>>>>> >>>> This is a very cool feature proposal. >>>>>> >>>> >>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is >>>>>> overly >>>>>> >>>> complicated to have the Leader RPC address in a different node >>>>>> than the >>>>>> >>>> LeaderLock. There is extra code needed to make sure these >>>>>> converge and the >>>>>> >>>> can be temporarily out of sync. >>>>>> >>>> >>>>>> >>>> A much easier design would be to have the RPC address as payload >>>>>> in the >>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing >>>>>> token is >>>>>> >>>> stored as payload of the lock. >>>>>> >>>> I think for the design above it would mean having a single >>>>>> ConfigMap >>>>>> >>>> for both leader lock and leader RPC address discovery. >>>>>> >>>> >>>>>> >>>> This probably serves as a good design principle in general - not >>>>>> divide >>>>>> >>>> information that is updated together over different resources. >>>>>> >>>> >>>>>> >>>> Best, >>>>>> >>>> Stephan >>>>>> >>>> >>>>>> >>>> >>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song < >>>>>> tonysong...@gmail.com> >>>>>> >>>> wrote: >>>>>> >>>> >>>>>> >>>>> Thanks for preparing this FLIP, @Yang. >>>>>> >>>>> >>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's >>>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly >>>>>> reduce the >>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I >>>>>> think this is an >>>>>> >>>>> attractive feature for users. >>>>>> >>>>> >>>>>> >>>>> Concerning the proposed design, I have some questions. Might >>>>>> not be >>>>>> >>>>> problems, just trying to understand. >>>>>> >>>>> >>>>>> >>>>> ## Architecture >>>>>> >>>>> >>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for >>>>>> contending >>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two >>>>>> ConfigMaps are >>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM >>>>>> becoming leader >>>>>> >>>>> (lock for contending leader updated), but still gets the old >>>>>> leader's >>>>>> >>>>> address when trying to read `leader RPC address`? >>>>>> >>>>> >>>>>> >>>>> ## HA storage > Lock and release >>>>>> >>>>> >>>>>> >>>>> It seems to me that the owner needs to explicitly release the >>>>>> lock so >>>>>> >>>>> that other peers can write/remove the stored object. What if >>>>>> the previous >>>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)? >>>>>> Would there >>>>>> >>>>> be any problem? >>>>>> >>>>> >>>>>> >>>>> ## HA storage > HA data clean up >>>>>> >>>>> >>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy >>>>>> <ClusterID>`, >>>>>> >>>>> how are the HA dada retained? >>>>>> >>>>> >>>>>> >>>>> >>>>>> >>>>> Thank you~ >>>>>> >>>>> >>>>>> >>>>> Xintong Song >>>>>> >>>>> >>>>>> >>>>> >>>>>> >>>>> >>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang < >>>>>> danrtsey...@gmail.com> >>>>>> >>>>> wrote: >>>>>> >>>>> >>>>>> >>>>>> Hi devs and users, >>>>>> >>>>>> >>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which >>>>>> will >>>>>> >>>>>> introduce >>>>>> >>>>>> a new native high availability service for Kubernetes. >>>>>> >>>>>> >>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been >>>>>> widely >>>>>> >>>>>> used >>>>>> >>>>>> in production environments. It could be integrated in >>>>>> standalone >>>>>> >>>>>> cluster, >>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA >>>>>> in K8s >>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper >>>>>> cluster. >>>>>> >>>>>> In the meantime, K8s has provided some public API for leader >>>>>> >>>>>> election[2] >>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could >>>>>> leverage these >>>>>> >>>>>> features and make running HA configured Flink cluster on K8s >>>>>> more >>>>>> >>>>>> convenient. >>>>>> >>>>>> >>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from >>>>>> the new >>>>>> >>>>>> introduced KubernetesHaService. >>>>>> >>>>>> >>>>>> >>>>>> [1]. >>>>>> >>>>>> >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink >>>>>> >>>>>> [2]. >>>>>> >>>>>> >>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/ >>>>>> >>>>>> [3]. >>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/ >>>>>> >>>>>> >>>>>> >>>>>> Looking forward to your feedback. >>>>>> >>>>>> >>>>>> >>>>>> Best, >>>>>> >>>>>> Yang >>>>>> >>>>>> >>>>>> >>>>> >>>>>> >>>>>