Thanks for your explanation. It would be fine if only checking leadership &
actually write information is atomic.

Best,
tison.


Yang Wang <danrtsey...@gmail.com> 于2020年9月30日周三 下午3:57写道:

> Thanks till and tison for your comments.
>
> @Till Rohrmann <trohrm...@apache.org>
> 1. I am afraid we could not do this if we are going to use fabric8
> Kubernetes client SDK for the leader election. The official Kubernetes Java
> client[1] also could not support it. Unless we implement a new
> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
> that we could gain too much from this.
>
> 2. Yes, the implementation will be a little complicated if we want to
> completely eliminate the residual job graphs or checkpoints. Inspired by
> your suggestion, another different solution has come into my mind. We could
> use a same ConfigMap storing the JobManager leader, job graph,
> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
> the HA meta storage. Then it will be easier to guarantee that only the
> leader could write the ConfigMap in a transactional operation. Since
> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
> transactional operation.
>
> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
> we still have the chances that two JobManager are running and trying to
> get/delete a key in the same ConfigMap concurrently. Imagine that the
> kubelet(like NodeManager in YARN) is down, and then the JobManager could
> not be deleted. A new JobManager pod will be launched. We are just in the
> similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
> is we do not need to implement a leader election/retrieval service.
>
> @tison
> Actually, I do not think we will have such issue in the Kubernetes HA
> service. In the Kubernetes LeaderElector[2], we have the leader information
> stored on the annotation of leader ConfigMap. So it would not happen the
> old leader could wrongly override the leader information. Once a JobManager
> want to write his leader information to the ConfigMap, it will check
> whether it is the leader now. If not, anything will happen. Moreover, the
> Kubernetes Resource Version[3] ensures that no one else has snuck in and
> written a different update while the client was in the process of
> performing its update.
>
>
> [1].
> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
> [2].
> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
> [3].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>
>
> Best,
> Yang
>
> tison <wander4...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>
>> Hi,
>>
>> Generally +1 for a native k8s HA service.
>>
>> For leader election & publish leader information, there was a
>> discussion[1]
>> pointed out that since these two actions is NOT atomic, there will be
>> always
>> edge case where a previous leader overwrite leader information, even with
>> versioned write. Versioned write helps on read again if version mismatches
>> so if we want version write works, information in the kv pair should help
>> the
>> contender reflects whether it is the current leader.
>>
>> The idea of writes leader information on contender node or something
>> equivalent makes sense but the details depends on how it is implemented.
>> General problems are that
>>
>> 1. TM might be a bit late before it updated correct leader information
>> but
>> only if the leader election process is short and leadership is stable at
>> most
>> time, it won't be a serious issue.
>> 2. The process TM extract leader information might be a bit more complex
>> than directly watching a fixed key.
>>
>> Atomic issue can be addressed if one leverages low APIs such as lease &
>> txn
>> but it causes more developing efforts. ConfigMap and encapsulated
>> interface,
>> thought, provides only a self-consistent mechanism which doesn't promise
>> more consistency for extension.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>
>>
>>
>> Till Rohrmann <trohrm...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>
>>> For 1. I was wondering whether we can't write the leader connection
>>> information directly when trying to obtain the leadership (trying to
>>> update
>>> the leader key with one's own value)? This might be a little detail,
>>> though.
>>>
>>> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
>>> with the ephemeral lock nodes. I guess that this complicates the
>>> implementation a bit, unfortunately.
>>>
>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>> configure a different persistent storage like HDFS or S3 for storing the
>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>> benefit I
>>> see is that we avoid having to implement this multi locking mechanism in
>>> the ConfigMaps using the annotations because we can be sure that there is
>>> only a single leader at a time if I understood the guarantees of K8s
>>> correctly.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <danrtsey...@gmail.com> wrote:
>>>
>>> > Hi Till, thanks for your valuable feedback.
>>> >
>>> > 1. Yes, leader election and storing leader information will use a same
>>> > ConfigMap. When a contender successfully performs a versioned
>>> annotation
>>> > update operation to the ConfigMap, it means that it has been elected
>>> as the
>>> > leader. And it will write the leader information in the callback of
>>> leader
>>> > elector[1]. The Kubernetes resource version will help us to avoid the
>>> > leader ConfigMap is wrongly updated.
>>> >
>>> > 2. The lock and release is really a valid concern. Actually in current
>>> > design, we could not guarantee that the node who tries to write his
>>> > ownership is the real leader. Who writes later, who is the owner. To
>>> > address this issue, we need to store all the owners of the key. Only
>>> when
>>> > the owner is empty, the specific key(means a checkpoint or job graph)
>>> could
>>> > be deleted. However, we may have a residual checkpoint or job graph
>>> when
>>> > the old JobManager crashed exceptionally and do not release the lock.
>>> To
>>> > solve this problem completely, we need a timestamp renew mechanism
>>> > for CompletedCheckpointStore and JobGraphStore, which could help us to
>>> the
>>> > check the JobManager timeout and then clean up the residual keys.
>>> >
>>> > 3. Frankly speaking, I am not against with this solution. However, in
>>> my
>>> > opinion, it is more like a temporary proposal. We could use
>>> StatefulSet to
>>> > avoid leader election and leader retrieval. But I am not sure whether
>>> > TaskManager could properly handle the situation that same hostname with
>>> > different IPs, because the JobManager failed and relaunched. Also we
>>> may
>>> > still have two JobManagers running in some corner cases(e.g. kubelet is
>>> > down but the pod is running). Another concern is we have a strong
>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService. But
>>> it
>>> > is not always true especially in self-build Kubernetes cluster.
>>> Moreover,
>>> > PV provider should guarantee that each PV could only be mounted once.
>>> Since
>>> > the native HA proposal could cover all the functionality of StatefulSet
>>> > proposal, that's why I prefer the former.
>>> >
>>> >
>>> > [1].
>>> >
>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>> >
>>> > Best,
>>> > Yang
>>> >
>>> > Till Rohrmann <trohrm...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>> >
>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>>> users
>>> >> will like a ZooKeeper-less HA setup.
>>> >>
>>> >> +1 for not separating the leader information and the leader election
>>> if
>>> >> possible. Maybe it is even possible that the contender writes his
>>> leader
>>> >> information directly when trying to obtain the leadership by
>>> performing a
>>> >> versioned write operation.
>>> >>
>>> >> Concerning the lock and release operation I have a question: Can
>>> there be
>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>> how can
>>> >> we ensure that the node which writes his ownership is actually the
>>> leader
>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>> problem
>>> >> (we should probably change it at some point to simply use a
>>> >> transaction which checks whether the writer is still the leader) and
>>> >> therefore introduced the ephemeral lock nodes. What they allow is that
>>> >> there can be multiple owners of a given ZNode at a time. The last
>>> owner
>>> >> will then be responsible for the cleanup of the node.
>>> >>
>>> >> I see the benefit of your proposal over the stateful set proposal
>>> because
>>> >> it can support multiple standby JMs. Given the problem of locking
>>> key-value
>>> >> pairs it might be simpler to start with this approach where we only
>>> have
>>> >> single JM. This might already add a lot of benefits for our users. Was
>>> >> there a specific reason why you discarded this proposal (other than
>>> >> generality)?
>>> >>
>>> >> @Uce it would be great to hear your feedback on the proposal since you
>>> >> already implemented a K8s based HA service.
>>> >>
>>> >> Cheers,
>>> >> Till
>>> >>
>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <danrtsey...@gmail.com>
>>> wrote:
>>> >>
>>> >>> Hi Xintong and Stephan,
>>> >>>
>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>> >>> comments inline.
>>> >>>
>>> >>> # Architecture -> One or two ConfigMaps
>>> >>>
>>> >>> Both of you are right. One ConfigMap will make the design and
>>> >>> implementation easier. Actually, in my POC codes,
>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>> >>> server component) for the leader election
>>> >>> and storage. Once a JobManager win the election, it will update the
>>> >>> ConfigMap with leader address and periodically
>>> >>> renew the lock annotation to keep as the active leader. I will update
>>> >>> the FLIP document, including the architecture diagram,
>>> >>> to avoid the misunderstanding.
>>> >>>
>>> >>>
>>> >>> # HA storage > Lock and release
>>> >>>
>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>> will be
>>> >>> deleted by the ZK server automatically when
>>> >>> the client is timeout. It could happen in a bad network environment
>>> or
>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>> >>> we need to implement a similar mechanism. First, when we want to
>>> lock a
>>> >>> specific key in ConfigMap, we will put the owner identify,
>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>> annotation
>>> >>> will be cleaned up when releasing the lock. When
>>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>>> >>> following conditions. If not, the delete operation could not be done.
>>> >>> * Current instance is the owner of the key.
>>> >>> * The owner annotation is empty, which means the owner has released
>>> the
>>> >>> lock.
>>> >>> * The owner annotation timed out, which usually indicate the owner
>>> died.
>>> >>>
>>> >>>
>>> >>> # HA storage > HA data clean up
>>> >>>
>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>> pods
>>> >>> to JobManager Deployment. So when we want to
>>> >>> destroy a Flink cluster, we just need to delete the deployment[2].
>>> For
>>> >>> the HA related ConfigMaps, we do not set the owner
>>> >>> so that they could be retained even though we delete the whole Flink
>>> >>> cluster.
>>> >>>
>>> >>>
>>> >>> [1].
>>> >>>
>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>> >>> [2].
>>> >>>
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>> >>>
>>> >>>
>>> >>> Best,
>>> >>> Yang
>>> >>>
>>> >>>
>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>> >>>
>>> >>>> This is a very cool feature proposal.
>>> >>>>
>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>> >>>> complicated to have the Leader RPC address in a different node than
>>> the
>>> >>>> LeaderLock. There is extra code needed to make sure these converge
>>> and the
>>> >>>> can be temporarily out of sync.
>>> >>>>
>>> >>>> A much easier design would be to have the RPC address as payload in
>>> the
>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>> token is
>>> >>>> stored as payload of the lock.
>>> >>>> I think for the design above it would mean having a single ConfigMap
>>> >>>> for both leader lock and leader RPC address discovery.
>>> >>>>
>>> >>>> This probably serves as a good design principle in general - not
>>> divide
>>> >>>> information that is updated together over different resources.
>>> >>>>
>>> >>>> Best,
>>> >>>> Stephan
>>> >>>>
>>> >>>>
>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>> tonysong...@gmail.com>
>>> >>>> wrote:
>>> >>>>
>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>> >>>>>
>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>> reduce the
>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>>> this is an
>>> >>>>> attractive feature for users.
>>> >>>>>
>>> >>>>> Concerning the proposed design, I have some questions. Might not be
>>> >>>>> problems, just trying to understand.
>>> >>>>>
>>> >>>>> ## Architecture
>>> >>>>>
>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>> contending
>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>> ConfigMaps are
>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>> becoming leader
>>> >>>>> (lock for contending leader updated), but still gets the old
>>> leader's
>>> >>>>> address when trying to read `leader RPC address`?
>>> >>>>>
>>> >>>>> ## HA storage > Lock and release
>>> >>>>>
>>> >>>>> It seems to me that the owner needs to explicitly release the lock
>>> so
>>> >>>>> that other peers can write/remove the stored object. What if the
>>> previous
>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>> Would there
>>> >>>>> be any problem?
>>> >>>>>
>>> >>>>> ## HA storage > HA data clean up
>>> >>>>>
>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>> <ClusterID>`,
>>> >>>>> how are the HA dada retained?
>>> >>>>>
>>> >>>>>
>>> >>>>> Thank you~
>>> >>>>>
>>> >>>>> Xintong Song
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <danrtsey...@gmail.com>
>>> >>>>> wrote:
>>> >>>>>
>>> >>>>>> Hi devs and users,
>>> >>>>>>
>>> >>>>>> I would like to start the discussion about FLIP-144[1], which will
>>> >>>>>> introduce
>>> >>>>>> a new native high availability service for Kubernetes.
>>> >>>>>>
>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been widely
>>> >>>>>> used
>>> >>>>>> in production environments. It could be integrated in standalone
>>> >>>>>> cluster,
>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in
>>> K8s
>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>> cluster.
>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>> >>>>>> election[2]
>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>>> these
>>> >>>>>> features and make running HA configured Flink cluster on K8s more
>>> >>>>>> convenient.
>>> >>>>>>
>>> >>>>>> Both the standalone on K8s and native K8s could benefit from the
>>> new
>>> >>>>>> introduced KubernetesHaService.
>>> >>>>>>
>>> >>>>>> [1].
>>> >>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>> >>>>>> [2].
>>> >>>>>>
>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>> >>>>>> [3]. https://kubernetes.io/docs/concepts/configuration/configmap/
>>> >>>>>>
>>> >>>>>> Looking forward to your feedback.
>>> >>>>>>
>>> >>>>>> Best,
>>> >>>>>> Yang
>>> >>>>>>
>>> >>>>>
>>>
>>

Reply via email to