2. Yes. This is exactly what I mean. Storing the HA information relevant to a specific component in a single ConfigMap and ensuring that “Get(check the leader)-and-Update(write back to the ConfigMap)” is a transactional operation. Since we only store the job graph stateHandler(not the real data) in the ConfigMap, I think 1MB is big enough for the dispater-leader ConfigMap(the biggest one with multiple jobs). I roughly calculate that could we have more than 1000 Flink jobs in a Flink session cluster.
3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet could provide at most one semantics if no manually force-deletion happened[1]. Based on the previous discussion, we have successfully avoided the "lock-and-release" in the implementation. So I still insist on using the current Deployment. [1]. https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion Best, Yang Till Rohrmann <trohrm...@apache.org> 于2020年9月30日周三 下午11:57写道: > Thanks for the clarifications Yang Wang. > > 2. Keeping the HA information relevant for a component (Dispatcher, > JobManager, ResourceManager) in a single ConfigMap sounds good. We should > check that we don't exceed the 1 MB size limit with this approach though. > The Dispatcher's ConfigMap would then contain the current leader, the > running jobs and the pointers to the persisted JobGraphs. The JobManager's > ConfigMap would then contain the current leader, the pointers to the > checkpoints and the checkpoint ID counter, for example. > > 3. Ah ok, I somehow thought that K8s would give us stronger > guarantees than Yarn in this regard. That's a pity. > > Cheers, > Till > > On Wed, Sep 30, 2020 at 10:03 AM tison <wander4...@gmail.com> wrote: > >> Thanks for your explanation. It would be fine if only checking leadership >> & actually write information is atomic. >> >> Best, >> tison. >> >> >> Yang Wang <danrtsey...@gmail.com> 于2020年9月30日周三 下午3:57写道: >> >>> Thanks till and tison for your comments. >>> >>> @Till Rohrmann <trohrm...@apache.org> >>> 1. I am afraid we could not do this if we are going to use fabric8 >>> Kubernetes client SDK for the leader election. The official Kubernetes Java >>> client[1] also could not support it. Unless we implement a new >>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems >>> that we could gain too much from this. >>> >>> 2. Yes, the implementation will be a little complicated if we want to >>> completely eliminate the residual job graphs or checkpoints. Inspired by >>> your suggestion, another different solution has come into my mind. We could >>> use a same ConfigMap storing the JobManager leader, job graph, >>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for >>> the HA meta storage. Then it will be easier to guarantee that only the >>> leader could write the ConfigMap in a transactional operation. Since >>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a >>> transactional operation. >>> >>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. >>> However, we still have the chances that two JobManager are running and >>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that >>> the kubelet(like NodeManager in YARN) is down, and then the JobManager >>> could not be deleted. A new JobManager pod will be launched. We are just in >>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only >>> benefit is we do not need to implement a leader election/retrieval service. >>> >>> @tison >>> Actually, I do not think we will have such issue in the Kubernetes HA >>> service. In the Kubernetes LeaderElector[2], we have the leader information >>> stored on the annotation of leader ConfigMap. So it would not happen the >>> old leader could wrongly override the leader information. Once a JobManager >>> want to write his leader information to the ConfigMap, it will check >>> whether it is the leader now. If not, anything will happen. Moreover, the >>> Kubernetes Resource Version[3] ensures that no one else has snuck in and >>> written a different update while the client was in the process of >>> performing its update. >>> >>> >>> [1]. >>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java >>> [2]. >>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java >>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70> >>> [3]. >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion >>> >>> >>> Best, >>> Yang >>> >>> tison <wander4...@gmail.com> 于2020年9月30日周三 下午3:21写道: >>> >>>> Hi, >>>> >>>> Generally +1 for a native k8s HA service. >>>> >>>> For leader election & publish leader information, there was a >>>> discussion[1] >>>> pointed out that since these two actions is NOT atomic, there will be >>>> always >>>> edge case where a previous leader overwrite leader information, even >>>> with >>>> versioned write. Versioned write helps on read again if version >>>> mismatches >>>> so if we want version write works, information in the kv pair should >>>> help the >>>> contender reflects whether it is the current leader. >>>> >>>> The idea of writes leader information on contender node or something >>>> equivalent makes sense but the details depends on how it is implemented. >>>> General problems are that >>>> >>>> 1. TM might be a bit late before it updated correct leader information >>>> but >>>> only if the leader election process is short and leadership is stable >>>> at most >>>> time, it won't be a serious issue. >>>> 2. The process TM extract leader information might be a bit more complex >>>> than directly watching a fixed key. >>>> >>>> Atomic issue can be addressed if one leverages low APIs such as lease & >>>> txn >>>> but it causes more developing efforts. ConfigMap and encapsulated >>>> interface, >>>> thought, provides only a self-consistent mechanism which doesn't promise >>>> more consistency for extension. >>>> >>>> Best, >>>> tison. >>>> >>>> [1] >>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E >>>> >>>> >>>> >>>> Till Rohrmann <trohrm...@apache.org> 于2020年9月29日周二 下午9:25写道: >>>> >>>>> For 1. I was wondering whether we can't write the leader connection >>>>> information directly when trying to obtain the leadership (trying to >>>>> update >>>>> the leader key with one's own value)? This might be a little detail, >>>>> though. >>>>> >>>>> 2. Alright, so we are having a similar mechanism as we have in >>>>> ZooKeeper >>>>> with the ephemeral lock nodes. I guess that this complicates the >>>>> implementation a bit, unfortunately. >>>>> >>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could >>>>> configure a different persistent storage like HDFS or S3 for storing >>>>> the >>>>> checkpoints and job blobs like in the ZooKeeper case. The current >>>>> benefit I >>>>> see is that we avoid having to implement this multi locking mechanism >>>>> in >>>>> the ConfigMaps using the annotations because we can be sure that there >>>>> is >>>>> only a single leader at a time if I understood the guarantees of K8s >>>>> correctly. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <danrtsey...@gmail.com> >>>>> wrote: >>>>> >>>>> > Hi Till, thanks for your valuable feedback. >>>>> > >>>>> > 1. Yes, leader election and storing leader information will use a >>>>> same >>>>> > ConfigMap. When a contender successfully performs a versioned >>>>> annotation >>>>> > update operation to the ConfigMap, it means that it has been elected >>>>> as the >>>>> > leader. And it will write the leader information in the callback of >>>>> leader >>>>> > elector[1]. The Kubernetes resource version will help us to avoid the >>>>> > leader ConfigMap is wrongly updated. >>>>> > >>>>> > 2. The lock and release is really a valid concern. Actually in >>>>> current >>>>> > design, we could not guarantee that the node who tries to write his >>>>> > ownership is the real leader. Who writes later, who is the owner. To >>>>> > address this issue, we need to store all the owners of the key. Only >>>>> when >>>>> > the owner is empty, the specific key(means a checkpoint or job >>>>> graph) could >>>>> > be deleted. However, we may have a residual checkpoint or job graph >>>>> when >>>>> > the old JobManager crashed exceptionally and do not release the >>>>> lock. To >>>>> > solve this problem completely, we need a timestamp renew mechanism >>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us >>>>> to the >>>>> > check the JobManager timeout and then clean up the residual keys. >>>>> > >>>>> > 3. Frankly speaking, I am not against with this solution. However, >>>>> in my >>>>> > opinion, it is more like a temporary proposal. We could use >>>>> StatefulSet to >>>>> > avoid leader election and leader retrieval. But I am not sure whether >>>>> > TaskManager could properly handle the situation that same hostname >>>>> with >>>>> > different IPs, because the JobManager failed and relaunched. Also we >>>>> may >>>>> > still have two JobManagers running in some corner cases(e.g. kubelet >>>>> is >>>>> > down but the pod is running). Another concern is we have a strong >>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. >>>>> But it >>>>> > is not always true especially in self-build Kubernetes cluster. >>>>> Moreover, >>>>> > PV provider should guarantee that each PV could only be mounted >>>>> once. Since >>>>> > the native HA proposal could cover all the functionality of >>>>> StatefulSet >>>>> > proposal, that's why I prefer the former. >>>>> > >>>>> > >>>>> > [1]. >>>>> > >>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70 >>>>> > >>>>> > Best, >>>>> > Yang >>>>> > >>>>> > Till Rohrmann <trohrm...@apache.org> 于2020年9月28日周一 下午9:29写道: >>>>> > >>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our >>>>> users >>>>> >> will like a ZooKeeper-less HA setup. >>>>> >> >>>>> >> +1 for not separating the leader information and the leader >>>>> election if >>>>> >> possible. Maybe it is even possible that the contender writes his >>>>> leader >>>>> >> information directly when trying to obtain the leadership by >>>>> performing a >>>>> >> versioned write operation. >>>>> >> >>>>> >> Concerning the lock and release operation I have a question: Can >>>>> there be >>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not, >>>>> how can >>>>> >> we ensure that the node which writes his ownership is actually the >>>>> leader >>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same >>>>> problem >>>>> >> (we should probably change it at some point to simply use a >>>>> >> transaction which checks whether the writer is still the leader) and >>>>> >> therefore introduced the ephemeral lock nodes. What they allow is >>>>> that >>>>> >> there can be multiple owners of a given ZNode at a time. The last >>>>> owner >>>>> >> will then be responsible for the cleanup of the node. >>>>> >> >>>>> >> I see the benefit of your proposal over the stateful set proposal >>>>> because >>>>> >> it can support multiple standby JMs. Given the problem of locking >>>>> key-value >>>>> >> pairs it might be simpler to start with this approach where we only >>>>> have >>>>> >> single JM. This might already add a lot of benefits for our users. >>>>> Was >>>>> >> there a specific reason why you discarded this proposal (other than >>>>> >> generality)? >>>>> >> >>>>> >> @Uce it would be great to hear your feedback on the proposal since >>>>> you >>>>> >> already implemented a K8s based HA service. >>>>> >> >>>>> >> Cheers, >>>>> >> Till >>>>> >> >>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <danrtsey...@gmail.com> >>>>> wrote: >>>>> >> >>>>> >>> Hi Xintong and Stephan, >>>>> >>> >>>>> >>> Thanks a lot for your attention on this FLIP. I will address the >>>>> >>> comments inline. >>>>> >>> >>>>> >>> # Architecture -> One or two ConfigMaps >>>>> >>> >>>>> >>> Both of you are right. One ConfigMap will make the design and >>>>> >>> implementation easier. Actually, in my POC codes, >>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for >>>>> rest >>>>> >>> server component) for the leader election >>>>> >>> and storage. Once a JobManager win the election, it will update the >>>>> >>> ConfigMap with leader address and periodically >>>>> >>> renew the lock annotation to keep as the active leader. I will >>>>> update >>>>> >>> the FLIP document, including the architecture diagram, >>>>> >>> to avoid the misunderstanding. >>>>> >>> >>>>> >>> >>>>> >>> # HA storage > Lock and release >>>>> >>> >>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it >>>>> will be >>>>> >>> deleted by the ZK server automatically when >>>>> >>> the client is timeout. It could happen in a bad network >>>>> environment or >>>>> >>> the ZK client crashed exceptionally. For Kubernetes, >>>>> >>> we need to implement a similar mechanism. First, when we want to >>>>> lock a >>>>> >>> specific key in ConfigMap, we will put the owner identify, >>>>> >>> lease duration, renew time in the ConfigMap annotation. The >>>>> annotation >>>>> >>> will be cleaned up when releasing the lock. When >>>>> >>> we want to remove a job graph or checkpoints, it should satisfy the >>>>> >>> following conditions. If not, the delete operation could not be >>>>> done. >>>>> >>> * Current instance is the owner of the key. >>>>> >>> * The owner annotation is empty, which means the owner has >>>>> released the >>>>> >>> lock. >>>>> >>> * The owner annotation timed out, which usually indicate the owner >>>>> died. >>>>> >>> >>>>> >>> >>>>> >>> # HA storage > HA data clean up >>>>> >>> >>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is >>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1], >>>>> >>> we set owner of the flink-conf configmap, service and TaskManager >>>>> pods >>>>> >>> to JobManager Deployment. So when we want to >>>>> >>> destroy a Flink cluster, we just need to delete the deployment[2]. >>>>> For >>>>> >>> the HA related ConfigMaps, we do not set the owner >>>>> >>> so that they could be retained even though we delete the whole >>>>> Flink >>>>> >>> cluster. >>>>> >>> >>>>> >>> >>>>> >>> [1]. >>>>> >>> >>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/ >>>>> >>> [2]. >>>>> >>> >>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session >>>>> >>> >>>>> >>> >>>>> >>> Best, >>>>> >>> Yang >>>>> >>> >>>>> >>> >>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道: >>>>> >>> >>>>> >>>> This is a very cool feature proposal. >>>>> >>>> >>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is >>>>> overly >>>>> >>>> complicated to have the Leader RPC address in a different node >>>>> than the >>>>> >>>> LeaderLock. There is extra code needed to make sure these >>>>> converge and the >>>>> >>>> can be temporarily out of sync. >>>>> >>>> >>>>> >>>> A much easier design would be to have the RPC address as payload >>>>> in the >>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing >>>>> token is >>>>> >>>> stored as payload of the lock. >>>>> >>>> I think for the design above it would mean having a single >>>>> ConfigMap >>>>> >>>> for both leader lock and leader RPC address discovery. >>>>> >>>> >>>>> >>>> This probably serves as a good design principle in general - not >>>>> divide >>>>> >>>> information that is updated together over different resources. >>>>> >>>> >>>>> >>>> Best, >>>>> >>>> Stephan >>>>> >>>> >>>>> >>>> >>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song < >>>>> tonysong...@gmail.com> >>>>> >>>> wrote: >>>>> >>>> >>>>> >>>>> Thanks for preparing this FLIP, @Yang. >>>>> >>>>> >>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's >>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly >>>>> reduce the >>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think >>>>> this is an >>>>> >>>>> attractive feature for users. >>>>> >>>>> >>>>> >>>>> Concerning the proposed design, I have some questions. Might not >>>>> be >>>>> >>>>> problems, just trying to understand. >>>>> >>>>> >>>>> >>>>> ## Architecture >>>>> >>>>> >>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for >>>>> contending >>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two >>>>> ConfigMaps are >>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM >>>>> becoming leader >>>>> >>>>> (lock for contending leader updated), but still gets the old >>>>> leader's >>>>> >>>>> address when trying to read `leader RPC address`? >>>>> >>>>> >>>>> >>>>> ## HA storage > Lock and release >>>>> >>>>> >>>>> >>>>> It seems to me that the owner needs to explicitly release the >>>>> lock so >>>>> >>>>> that other peers can write/remove the stored object. What if the >>>>> previous >>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)? >>>>> Would there >>>>> >>>>> be any problem? >>>>> >>>>> >>>>> >>>>> ## HA storage > HA data clean up >>>>> >>>>> >>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy >>>>> <ClusterID>`, >>>>> >>>>> how are the HA dada retained? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Thank you~ >>>>> >>>>> >>>>> >>>>> Xintong Song >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang < >>>>> danrtsey...@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>> >>>>> >>>>>> Hi devs and users, >>>>> >>>>>> >>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which >>>>> will >>>>> >>>>>> introduce >>>>> >>>>>> a new native high availability service for Kubernetes. >>>>> >>>>>> >>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been >>>>> widely >>>>> >>>>>> used >>>>> >>>>>> in production environments. It could be integrated in standalone >>>>> >>>>>> cluster, >>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA >>>>> in K8s >>>>> >>>>>> will take additional cost since we need to manage a Zookeeper >>>>> cluster. >>>>> >>>>>> In the meantime, K8s has provided some public API for leader >>>>> >>>>>> election[2] >>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage >>>>> these >>>>> >>>>>> features and make running HA configured Flink cluster on K8s >>>>> more >>>>> >>>>>> convenient. >>>>> >>>>>> >>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from >>>>> the new >>>>> >>>>>> introduced KubernetesHaService. >>>>> >>>>>> >>>>> >>>>>> [1]. >>>>> >>>>>> >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink >>>>> >>>>>> [2]. >>>>> >>>>>> >>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/ >>>>> >>>>>> [3]. >>>>> https://kubernetes.io/docs/concepts/configuration/configmap/ >>>>> >>>>>> >>>>> >>>>>> Looking forward to your feedback. >>>>> >>>>>> >>>>> >>>>>> Best, >>>>> >>>>>> Yang >>>>> >>>>>> >>>>> >>>>> >>>>> >>>>