3. Make sense to me. And we could add a new HA solution "StatefulSet + PV + FileSystem" at any time if we need in the future.
Since there are no more open questions, I will start the voting now. Thanks all for your comments and feedback. Feel feel to continue the discussion if you get other concerns. Best, Yang Till Rohrmann <trohrm...@apache.org> 于2020年10月1日周四 下午4:52写道: > 3. We could avoid force deletions from within Flink. If the user does it, > then we don't give guarantees. > > I am fine with your current proposal. +1 for moving forward with it. > > Cheers, > Till > > On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <danrtsey...@gmail.com> wrote: > > > 2. Yes. This is exactly what I mean. Storing the HA information relevant > > to a specific component in a single ConfigMap and ensuring that > “Get(check > > the leader)-and-Update(write back to the ConfigMap)” is a transactional > > operation. Since we only store the job graph stateHandler(not the real > > data) in the ConfigMap, I think 1MB is big enough for the dispater-leader > > ConfigMap(the biggest one with multiple jobs). I roughly calculate that > > could we have more than 1000 Flink jobs in a Flink session cluster. > > > > 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet > > could provide at most one semantics if no manually force-deletion > > happened[1]. Based on the previous discussion, we have successfully > avoided > > the "lock-and-release" in the implementation. So I still insist on using > > the current Deployment. > > > > > > [1]. > > > https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion > > > > > > Best, > > Yang > > > > Till Rohrmann <trohrm...@apache.org> 于2020年9月30日周三 下午11:57写道: > > > >> Thanks for the clarifications Yang Wang. > >> > >> 2. Keeping the HA information relevant for a component (Dispatcher, > >> JobManager, ResourceManager) in a single ConfigMap sounds good. We > should > >> check that we don't exceed the 1 MB size limit with this approach > though. > >> The Dispatcher's ConfigMap would then contain the current leader, the > >> running jobs and the pointers to the persisted JobGraphs. The > JobManager's > >> ConfigMap would then contain the current leader, the pointers to the > >> checkpoints and the checkpoint ID counter, for example. > >> > >> 3. Ah ok, I somehow thought that K8s would give us stronger > >> guarantees than Yarn in this regard. That's a pity. > >> > >> Cheers, > >> Till > >> > >> On Wed, Sep 30, 2020 at 10:03 AM tison <wander4...@gmail.com> wrote: > >> > >>> Thanks for your explanation. It would be fine if only checking > >>> leadership & actually write information is atomic. > >>> > >>> Best, > >>> tison. > >>> > >>> > >>> Yang Wang <danrtsey...@gmail.com> 于2020年9月30日周三 下午3:57写道: > >>> > >>>> Thanks till and tison for your comments. > >>>> > >>>> @Till Rohrmann <trohrm...@apache.org> > >>>> 1. I am afraid we could not do this if we are going to use fabric8 > >>>> Kubernetes client SDK for the leader election. The official > Kubernetes Java > >>>> client[1] also could not support it. Unless we implement a new > >>>> LeaderElector in Flink based on the very basic Kubernetes API. But it > seems > >>>> that we could gain too much from this. > >>>> > >>>> 2. Yes, the implementation will be a little complicated if we want to > >>>> completely eliminate the residual job graphs or checkpoints. Inspired > by > >>>> your suggestion, another different solution has come into my mind. We > could > >>>> use a same ConfigMap storing the JobManager leader, job graph, > >>>> checkpoint-counter, checkpoint. Each job will have a specific > ConfigMap for > >>>> the HA meta storage. Then it will be easier to guarantee that only the > >>>> leader could write the ConfigMap in a transactional operation. Since > >>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a > >>>> transactional operation. > >>>> > >>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. > >>>> However, we still have the chances that two JobManager are running and > >>>> trying to get/delete a key in the same ConfigMap concurrently. > Imagine that > >>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager > >>>> could not be deleted. A new JobManager pod will be launched. We are > just in > >>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only > >>>> benefit is we do not need to implement a leader election/retrieval > service. > >>>> > >>>> @tison > >>>> Actually, I do not think we will have such issue in the Kubernetes HA > >>>> service. In the Kubernetes LeaderElector[2], we have the leader > information > >>>> stored on the annotation of leader ConfigMap. So it would not happen > the > >>>> old leader could wrongly override the leader information. Once a > JobManager > >>>> want to write his leader information to the ConfigMap, it will check > >>>> whether it is the leader now. If not, anything will happen. Moreover, > the > >>>> Kubernetes Resource Version[3] ensures that no one else has snuck in > and > >>>> written a different update while the client was in the process of > >>>> performing its update. > >>>> > >>>> > >>>> [1]. > >>>> > https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java > >>>> [2]. > >>>> > https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java > >>>> < > https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70 > > > >>>> [3]. > >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion > >>>> > >>>> > >>>> Best, > >>>> Yang > >>>> > >>>> tison <wander4...@gmail.com> 于2020年9月30日周三 下午3:21写道: > >>>> > >>>>> Hi, > >>>>> > >>>>> Generally +1 for a native k8s HA service. > >>>>> > >>>>> For leader election & publish leader information, there was a > >>>>> discussion[1] > >>>>> pointed out that since these two actions is NOT atomic, there will be > >>>>> always > >>>>> edge case where a previous leader overwrite leader information, even > >>>>> with > >>>>> versioned write. Versioned write helps on read again if version > >>>>> mismatches > >>>>> so if we want version write works, information in the kv pair should > >>>>> help the > >>>>> contender reflects whether it is the current leader. > >>>>> > >>>>> The idea of writes leader information on contender node or something > >>>>> equivalent makes sense but the details depends on how it is > >>>>> implemented. > >>>>> General problems are that > >>>>> > >>>>> 1. TM might be a bit late before it updated correct leader > information > >>>>> but > >>>>> only if the leader election process is short and leadership is stable > >>>>> at most > >>>>> time, it won't be a serious issue. > >>>>> 2. The process TM extract leader information might be a bit more > >>>>> complex > >>>>> than directly watching a fixed key. > >>>>> > >>>>> Atomic issue can be addressed if one leverages low APIs such as lease > >>>>> & txn > >>>>> but it causes more developing efforts. ConfigMap and encapsulated > >>>>> interface, > >>>>> thought, provides only a self-consistent mechanism which doesn't > >>>>> promise > >>>>> more consistency for extension. > >>>>> > >>>>> Best, > >>>>> tison. > >>>>> > >>>>> [1] > >>>>> > https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E > >>>>> > >>>>> > >>>>> > >>>>> Till Rohrmann <trohrm...@apache.org> 于2020年9月29日周二 下午9:25写道: > >>>>> > >>>>>> For 1. I was wondering whether we can't write the leader connection > >>>>>> information directly when trying to obtain the leadership (trying to > >>>>>> update > >>>>>> the leader key with one's own value)? This might be a little detail, > >>>>>> though. > >>>>>> > >>>>>> 2. Alright, so we are having a similar mechanism as we have in > >>>>>> ZooKeeper > >>>>>> with the ephemeral lock nodes. I guess that this complicates the > >>>>>> implementation a bit, unfortunately. > >>>>>> > >>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One > could > >>>>>> configure a different persistent storage like HDFS or S3 for storing > >>>>>> the > >>>>>> checkpoints and job blobs like in the ZooKeeper case. The current > >>>>>> benefit I > >>>>>> see is that we avoid having to implement this multi locking > mechanism > >>>>>> in > >>>>>> the ConfigMaps using the annotations because we can be sure that > >>>>>> there is > >>>>>> only a single leader at a time if I understood the guarantees of K8s > >>>>>> correctly. > >>>>>> > >>>>>> Cheers, > >>>>>> Till > >>>>>> > >>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <danrtsey...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>> > Hi Till, thanks for your valuable feedback. > >>>>>> > > >>>>>> > 1. Yes, leader election and storing leader information will use a > >>>>>> same > >>>>>> > ConfigMap. When a contender successfully performs a versioned > >>>>>> annotation > >>>>>> > update operation to the ConfigMap, it means that it has been > >>>>>> elected as the > >>>>>> > leader. And it will write the leader information in the callback > of > >>>>>> leader > >>>>>> > elector[1]. The Kubernetes resource version will help us to avoid > >>>>>> the > >>>>>> > leader ConfigMap is wrongly updated. > >>>>>> > > >>>>>> > 2. The lock and release is really a valid concern. Actually in > >>>>>> current > >>>>>> > design, we could not guarantee that the node who tries to write > his > >>>>>> > ownership is the real leader. Who writes later, who is the owner. > To > >>>>>> > address this issue, we need to store all the owners of the key. > >>>>>> Only when > >>>>>> > the owner is empty, the specific key(means a checkpoint or job > >>>>>> graph) could > >>>>>> > be deleted. However, we may have a residual checkpoint or job > graph > >>>>>> when > >>>>>> > the old JobManager crashed exceptionally and do not release the > >>>>>> lock. To > >>>>>> > solve this problem completely, we need a timestamp renew mechanism > >>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help > us > >>>>>> to the > >>>>>> > check the JobManager timeout and then clean up the residual keys. > >>>>>> > > >>>>>> > 3. Frankly speaking, I am not against with this solution. However, > >>>>>> in my > >>>>>> > opinion, it is more like a temporary proposal. We could use > >>>>>> StatefulSet to > >>>>>> > avoid leader election and leader retrieval. But I am not sure > >>>>>> whether > >>>>>> > TaskManager could properly handle the situation that same hostname > >>>>>> with > >>>>>> > different IPs, because the JobManager failed and relaunched. Also > >>>>>> we may > >>>>>> > still have two JobManagers running in some corner cases(e.g. > >>>>>> kubelet is > >>>>>> > down but the pod is running). Another concern is we have a strong > >>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. > >>>>>> But it > >>>>>> > is not always true especially in self-build Kubernetes cluster. > >>>>>> Moreover, > >>>>>> > PV provider should guarantee that each PV could only be mounted > >>>>>> once. Since > >>>>>> > the native HA proposal could cover all the functionality of > >>>>>> StatefulSet > >>>>>> > proposal, that's why I prefer the former. > >>>>>> > > >>>>>> > > >>>>>> > [1]. > >>>>>> > > >>>>>> > https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70 > >>>>>> > > >>>>>> > Best, > >>>>>> > Yang > >>>>>> > > >>>>>> > Till Rohrmann <trohrm...@apache.org> 于2020年9月28日周一 下午9:29写道: > >>>>>> > > >>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of > >>>>>> our users > >>>>>> >> will like a ZooKeeper-less HA setup. > >>>>>> >> > >>>>>> >> +1 for not separating the leader information and the leader > >>>>>> election if > >>>>>> >> possible. Maybe it is even possible that the contender writes his > >>>>>> leader > >>>>>> >> information directly when trying to obtain the leadership by > >>>>>> performing a > >>>>>> >> versioned write operation. > >>>>>> >> > >>>>>> >> Concerning the lock and release operation I have a question: Can > >>>>>> there be > >>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If > not, > >>>>>> how can > >>>>>> >> we ensure that the node which writes his ownership is actually > the > >>>>>> leader > >>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same > >>>>>> problem > >>>>>> >> (we should probably change it at some point to simply use a > >>>>>> >> transaction which checks whether the writer is still the leader) > >>>>>> and > >>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is > >>>>>> that > >>>>>> >> there can be multiple owners of a given ZNode at a time. The last > >>>>>> owner > >>>>>> >> will then be responsible for the cleanup of the node. > >>>>>> >> > >>>>>> >> I see the benefit of your proposal over the stateful set proposal > >>>>>> because > >>>>>> >> it can support multiple standby JMs. Given the problem of locking > >>>>>> key-value > >>>>>> >> pairs it might be simpler to start with this approach where we > >>>>>> only have > >>>>>> >> single JM. This might already add a lot of benefits for our > users. > >>>>>> Was > >>>>>> >> there a specific reason why you discarded this proposal (other > than > >>>>>> >> generality)? > >>>>>> >> > >>>>>> >> @Uce it would be great to hear your feedback on the proposal > since > >>>>>> you > >>>>>> >> already implemented a K8s based HA service. > >>>>>> >> > >>>>>> >> Cheers, > >>>>>> >> Till > >>>>>> >> > >>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <danrtsey...@gmail.com > > > >>>>>> wrote: > >>>>>> >> > >>>>>> >>> Hi Xintong and Stephan, > >>>>>> >>> > >>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the > >>>>>> >>> comments inline. > >>>>>> >>> > >>>>>> >>> # Architecture -> One or two ConfigMaps > >>>>>> >>> > >>>>>> >>> Both of you are right. One ConfigMap will make the design and > >>>>>> >>> implementation easier. Actually, in my POC codes, > >>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for > >>>>>> rest > >>>>>> >>> server component) for the leader election > >>>>>> >>> and storage. Once a JobManager win the election, it will update > >>>>>> the > >>>>>> >>> ConfigMap with leader address and periodically > >>>>>> >>> renew the lock annotation to keep as the active leader. I will > >>>>>> update > >>>>>> >>> the FLIP document, including the architecture diagram, > >>>>>> >>> to avoid the misunderstanding. > >>>>>> >>> > >>>>>> >>> > >>>>>> >>> # HA storage > Lock and release > >>>>>> >>> > >>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it > >>>>>> will be > >>>>>> >>> deleted by the ZK server automatically when > >>>>>> >>> the client is timeout. It could happen in a bad network > >>>>>> environment or > >>>>>> >>> the ZK client crashed exceptionally. For Kubernetes, > >>>>>> >>> we need to implement a similar mechanism. First, when we want to > >>>>>> lock a > >>>>>> >>> specific key in ConfigMap, we will put the owner identify, > >>>>>> >>> lease duration, renew time in the ConfigMap annotation. The > >>>>>> annotation > >>>>>> >>> will be cleaned up when releasing the lock. When > >>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy > >>>>>> the > >>>>>> >>> following conditions. If not, the delete operation could not be > >>>>>> done. > >>>>>> >>> * Current instance is the owner of the key. > >>>>>> >>> * The owner annotation is empty, which means the owner has > >>>>>> released the > >>>>>> >>> lock. > >>>>>> >>> * The owner annotation timed out, which usually indicate the > >>>>>> owner died. > >>>>>> >>> > >>>>>> >>> > >>>>>> >>> # HA storage > HA data clean up > >>>>>> >>> > >>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is > >>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1], > >>>>>> >>> we set owner of the flink-conf configmap, service and > TaskManager > >>>>>> pods > >>>>>> >>> to JobManager Deployment. So when we want to > >>>>>> >>> destroy a Flink cluster, we just need to delete the > >>>>>> deployment[2]. For > >>>>>> >>> the HA related ConfigMaps, we do not set the owner > >>>>>> >>> so that they could be retained even though we delete the whole > >>>>>> Flink > >>>>>> >>> cluster. > >>>>>> >>> > >>>>>> >>> > >>>>>> >>> [1]. > >>>>>> >>> > >>>>>> > https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/ > >>>>>> >>> [2]. > >>>>>> >>> > >>>>>> > https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session > >>>>>> >>> > >>>>>> >>> > >>>>>> >>> Best, > >>>>>> >>> Yang > >>>>>> >>> > >>>>>> >>> > >>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道: > >>>>>> >>> > >>>>>> >>>> This is a very cool feature proposal. > >>>>>> >>>> > >>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is > >>>>>> overly > >>>>>> >>>> complicated to have the Leader RPC address in a different node > >>>>>> than the > >>>>>> >>>> LeaderLock. There is extra code needed to make sure these > >>>>>> converge and the > >>>>>> >>>> can be temporarily out of sync. > >>>>>> >>>> > >>>>>> >>>> A much easier design would be to have the RPC address as > payload > >>>>>> in the > >>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing > >>>>>> token is > >>>>>> >>>> stored as payload of the lock. > >>>>>> >>>> I think for the design above it would mean having a single > >>>>>> ConfigMap > >>>>>> >>>> for both leader lock and leader RPC address discovery. > >>>>>> >>>> > >>>>>> >>>> This probably serves as a good design principle in general - > not > >>>>>> divide > >>>>>> >>>> information that is updated together over different resources. > >>>>>> >>>> > >>>>>> >>>> Best, > >>>>>> >>>> Stephan > >>>>>> >>>> > >>>>>> >>>> > >>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song < > >>>>>> tonysong...@gmail.com> > >>>>>> >>>> wrote: > >>>>>> >>>> > >>>>>> >>>>> Thanks for preparing this FLIP, @Yang. > >>>>>> >>>>> > >>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging > Kubernetes's > >>>>>> >>>>> buildtin ConfigMap for Flink's HA services should > significantly > >>>>>> reduce the > >>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I > >>>>>> think this is an > >>>>>> >>>>> attractive feature for users. > >>>>>> >>>>> > >>>>>> >>>>> Concerning the proposed design, I have some questions. Might > >>>>>> not be > >>>>>> >>>>> problems, just trying to understand. > >>>>>> >>>>> > >>>>>> >>>>> ## Architecture > >>>>>> >>>>> > >>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for > >>>>>> contending > >>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two > >>>>>> ConfigMaps are > >>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM > >>>>>> becoming leader > >>>>>> >>>>> (lock for contending leader updated), but still gets the old > >>>>>> leader's > >>>>>> >>>>> address when trying to read `leader RPC address`? > >>>>>> >>>>> > >>>>>> >>>>> ## HA storage > Lock and release > >>>>>> >>>>> > >>>>>> >>>>> It seems to me that the owner needs to explicitly release the > >>>>>> lock so > >>>>>> >>>>> that other peers can write/remove the stored object. What if > >>>>>> the previous > >>>>>> >>>>> owner failed to release the lock (e.g., dead before > releasing)? > >>>>>> Would there > >>>>>> >>>>> be any problem? > >>>>>> >>>>> > >>>>>> >>>>> ## HA storage > HA data clean up > >>>>>> >>>>> > >>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy > >>>>>> <ClusterID>`, > >>>>>> >>>>> how are the HA dada retained? > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>>>> Thank you~ > >>>>>> >>>>> > >>>>>> >>>>> Xintong Song > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang < > >>>>>> danrtsey...@gmail.com> > >>>>>> >>>>> wrote: > >>>>>> >>>>> > >>>>>> >>>>>> Hi devs and users, > >>>>>> >>>>>> > >>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which > >>>>>> will > >>>>>> >>>>>> introduce > >>>>>> >>>>>> a new native high availability service for Kubernetes. > >>>>>> >>>>>> > >>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been > >>>>>> widely > >>>>>> >>>>>> used > >>>>>> >>>>>> in production environments. It could be integrated in > >>>>>> standalone > >>>>>> >>>>>> cluster, > >>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA > >>>>>> in K8s > >>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper > >>>>>> cluster. > >>>>>> >>>>>> In the meantime, K8s has provided some public API for leader > >>>>>> >>>>>> election[2] > >>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could > >>>>>> leverage these > >>>>>> >>>>>> features and make running HA configured Flink cluster on K8s > >>>>>> more > >>>>>> >>>>>> convenient. > >>>>>> >>>>>> > >>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from > >>>>>> the new > >>>>>> >>>>>> introduced KubernetesHaService. > >>>>>> >>>>>> > >>>>>> >>>>>> [1]. > >>>>>> >>>>>> > >>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink > >>>>>> >>>>>> [2]. > >>>>>> >>>>>> > >>>>>> > https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/ > >>>>>> >>>>>> [3]. > >>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/ > >>>>>> >>>>>> > >>>>>> >>>>>> Looking forward to your feedback. > >>>>>> >>>>>> > >>>>>> >>>>>> Best, > >>>>>> >>>>>> Yang > >>>>>> >>>>>> > >>>>>> >>>>> > >>>>>> > >>>>> >