2. Yes. This is exactly what I mean. Storing the HA information relevant to
a specific component in a single ConfigMap and ensuring that “Get(check the
leader)-and-Update(write back to the ConfigMap)” is a transactional
operation. Since we only store the job graph stateHandler(not the real
data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
ConfigMap(the biggest one with multiple jobs). I roughly calculate that
could we have more than 1000 Flink jobs in a Flink session cluster.

3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
could provide at most one semantics if no manually force-deletion
happened[1]. Based on the previous discussion, we have successfully avoided
the "lock-and-release" in the implementation. So I still insist on using
the current Deployment.


[1].
https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion


Best,
Yang

Till Rohrmann <trohrm...@apache.org> 于2020年9月30日周三 下午11:57写道:

> Thanks for the clarifications Yang Wang.
>
> 2. Keeping the HA information relevant for a component (Dispatcher,
> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
> check that we don't exceed the 1 MB size limit with this approach though.
> The Dispatcher's ConfigMap would then contain the current leader, the
> running jobs and the pointers to the persisted JobGraphs. The JobManager's
> ConfigMap would then contain the current leader, the pointers to the
> checkpoints and the checkpoint ID counter, for example.
>
> 3. Ah ok, I somehow thought that K8s would give us stronger
> guarantees than Yarn in this regard. That's a pity.
>
> Cheers,
> Till
>
> On Wed, Sep 30, 2020 at 10:03 AM tison <wander4...@gmail.com> wrote:
>
>> Thanks for your explanation. It would be fine if only checking leadership
>> & actually write information is atomic.
>>
>> Best,
>> tison.
>>
>>
>> Yang Wang <danrtsey...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>>
>>> Thanks till and tison for your comments.
>>>
>>> @Till Rohrmann <trohrm...@apache.org>
>>> 1. I am afraid we could not do this if we are going to use fabric8
>>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>>> client[1] also could not support it. Unless we implement a new
>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>>> that we could gain too much from this.
>>>
>>> 2. Yes, the implementation will be a little complicated if we want to
>>> completely eliminate the residual job graphs or checkpoints. Inspired by
>>> your suggestion, another different solution has come into my mind. We could
>>> use a same ConfigMap storing the JobManager leader, job graph,
>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>>> the HA meta storage. Then it will be easier to guarantee that only the
>>> leader could write the ConfigMap in a transactional operation. Since
>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>>> transactional operation.
>>>
>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>>> However, we still have the chances that two JobManager are running and
>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that
>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
>>> could not be deleted. A new JobManager pod will be launched. We are just in
>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>>> benefit is we do not need to implement a leader election/retrieval service.
>>>
>>> @tison
>>> Actually, I do not think we will have such issue in the Kubernetes HA
>>> service. In the Kubernetes LeaderElector[2], we have the leader information
>>> stored on the annotation of leader ConfigMap. So it would not happen the
>>> old leader could wrongly override the leader information. Once a JobManager
>>> want to write his leader information to the ConfigMap, it will check
>>> whether it is the leader now. If not, anything will happen. Moreover, the
>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>>> written a different update while the client was in the process of
>>> performing its update.
>>>
>>>
>>> [1].
>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>>> [2].
>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>>> [3].
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> tison <wander4...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>>
>>>> Hi,
>>>>
>>>> Generally +1 for a native k8s HA service.
>>>>
>>>> For leader election & publish leader information, there was a
>>>> discussion[1]
>>>> pointed out that since these two actions is NOT atomic, there will be
>>>> always
>>>> edge case where a previous leader overwrite leader information, even
>>>> with
>>>> versioned write. Versioned write helps on read again if version
>>>> mismatches
>>>> so if we want version write works, information in the kv pair should
>>>> help the
>>>> contender reflects whether it is the current leader.
>>>>
>>>> The idea of writes leader information on contender node or something
>>>> equivalent makes sense but the details depends on how it is implemented.
>>>> General problems are that
>>>>
>>>> 1. TM might be a bit late before it updated correct leader information
>>>> but
>>>> only if the leader election process is short and leadership is stable
>>>> at most
>>>> time, it won't be a serious issue.
>>>> 2. The process TM extract leader information might be a bit more complex
>>>> than directly watching a fixed key.
>>>>
>>>> Atomic issue can be addressed if one leverages low APIs such as lease &
>>>> txn
>>>> but it causes more developing efforts. ConfigMap and encapsulated
>>>> interface,
>>>> thought, provides only a self-consistent mechanism which doesn't promise
>>>> more consistency for extension.
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>> [1]
>>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>>
>>>>
>>>>
>>>> Till Rohrmann <trohrm...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>>
>>>>> For 1. I was wondering whether we can't write the leader connection
>>>>> information directly when trying to obtain the leadership (trying to
>>>>> update
>>>>> the leader key with one's own value)? This might be a little detail,
>>>>> though.
>>>>>
>>>>> 2. Alright, so we are having a similar mechanism as we have in
>>>>> ZooKeeper
>>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>>> implementation a bit, unfortunately.
>>>>>
>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>>> configure a different persistent storage like HDFS or S3 for storing
>>>>> the
>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>>> benefit I
>>>>> see is that we avoid having to implement this multi locking mechanism
>>>>> in
>>>>> the ConfigMaps using the annotations because we can be sure that there
>>>>> is
>>>>> only a single leader at a time if I understood the guarantees of K8s
>>>>> correctly.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <danrtsey...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Hi Till, thanks for your valuable feedback.
>>>>> >
>>>>> > 1. Yes, leader election and storing leader information will use a
>>>>> same
>>>>> > ConfigMap. When a contender successfully performs a versioned
>>>>> annotation
>>>>> > update operation to the ConfigMap, it means that it has been elected
>>>>> as the
>>>>> > leader. And it will write the leader information in the callback of
>>>>> leader
>>>>> > elector[1]. The Kubernetes resource version will help us to avoid the
>>>>> > leader ConfigMap is wrongly updated.
>>>>> >
>>>>> > 2. The lock and release is really a valid concern. Actually in
>>>>> current
>>>>> > design, we could not guarantee that the node who tries to write his
>>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>>> > address this issue, we need to store all the owners of the key. Only
>>>>> when
>>>>> > the owner is empty, the specific key(means a checkpoint or job
>>>>> graph) could
>>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>>> when
>>>>> > the old JobManager crashed exceptionally and do not release the
>>>>> lock. To
>>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>>> to the
>>>>> > check the JobManager timeout and then clean up the residual keys.
>>>>> >
>>>>> > 3. Frankly speaking, I am not against with this solution. However,
>>>>> in my
>>>>> > opinion, it is more like a temporary proposal. We could use
>>>>> StatefulSet to
>>>>> > avoid leader election and leader retrieval. But I am not sure whether
>>>>> > TaskManager could properly handle the situation that same hostname
>>>>> with
>>>>> > different IPs, because the JobManager failed and relaunched. Also we
>>>>> may
>>>>> > still have two JobManagers running in some corner cases(e.g. kubelet
>>>>> is
>>>>> > down but the pod is running). Another concern is we have a strong
>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>>> But it
>>>>> > is not always true especially in self-build Kubernetes cluster.
>>>>> Moreover,
>>>>> > PV provider should guarantee that each PV could only be mounted
>>>>> once. Since
>>>>> > the native HA proposal could cover all the functionality of
>>>>> StatefulSet
>>>>> > proposal, that's why I prefer the former.
>>>>> >
>>>>> >
>>>>> > [1].
>>>>> >
>>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>>> >
>>>>> > Best,
>>>>> > Yang
>>>>> >
>>>>> > Till Rohrmann <trohrm...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>>> >
>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>>>>> users
>>>>> >> will like a ZooKeeper-less HA setup.
>>>>> >>
>>>>> >> +1 for not separating the leader information and the leader
>>>>> election if
>>>>> >> possible. Maybe it is even possible that the contender writes his
>>>>> leader
>>>>> >> information directly when trying to obtain the leadership by
>>>>> performing a
>>>>> >> versioned write operation.
>>>>> >>
>>>>> >> Concerning the lock and release operation I have a question: Can
>>>>> there be
>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>>> how can
>>>>> >> we ensure that the node which writes his ownership is actually the
>>>>> leader
>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>>> problem
>>>>> >> (we should probably change it at some point to simply use a
>>>>> >> transaction which checks whether the writer is still the leader) and
>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>>> that
>>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>>> owner
>>>>> >> will then be responsible for the cleanup of the node.
>>>>> >>
>>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>>> because
>>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>>> key-value
>>>>> >> pairs it might be simpler to start with this approach where we only
>>>>> have
>>>>> >> single JM. This might already add a lot of benefits for our users.
>>>>> Was
>>>>> >> there a specific reason why you discarded this proposal (other than
>>>>> >> generality)?
>>>>> >>
>>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>>> you
>>>>> >> already implemented a K8s based HA service.
>>>>> >>
>>>>> >> Cheers,
>>>>> >> Till
>>>>> >>
>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <danrtsey...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >>> Hi Xintong and Stephan,
>>>>> >>>
>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>>> >>> comments inline.
>>>>> >>>
>>>>> >>> # Architecture -> One or two ConfigMaps
>>>>> >>>
>>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>>> >>> implementation easier. Actually, in my POC codes,
>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
>>>>> rest
>>>>> >>> server component) for the leader election
>>>>> >>> and storage. Once a JobManager win the election, it will update the
>>>>> >>> ConfigMap with leader address and periodically
>>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>>> update
>>>>> >>> the FLIP document, including the architecture diagram,
>>>>> >>> to avoid the misunderstanding.
>>>>> >>>
>>>>> >>>
>>>>> >>> # HA storage > Lock and release
>>>>> >>>
>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>>> will be
>>>>> >>> deleted by the ZK server automatically when
>>>>> >>> the client is timeout. It could happen in a bad network
>>>>> environment or
>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>>> lock a
>>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>>> annotation
>>>>> >>> will be cleaned up when releasing the lock. When
>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>>>>> >>> following conditions. If not, the delete operation could not be
>>>>> done.
>>>>> >>> * Current instance is the owner of the key.
>>>>> >>> * The owner annotation is empty, which means the owner has
>>>>> released the
>>>>> >>> lock.
>>>>> >>> * The owner annotation timed out, which usually indicate the owner
>>>>> died.
>>>>> >>>
>>>>> >>>
>>>>> >>> # HA storage > HA data clean up
>>>>> >>>
>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>>> pods
>>>>> >>> to JobManager Deployment. So when we want to
>>>>> >>> destroy a Flink cluster, we just need to delete the deployment[2].
>>>>> For
>>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>>> >>> so that they could be retained even though we delete the whole
>>>>> Flink
>>>>> >>> cluster.
>>>>> >>>
>>>>> >>>
>>>>> >>> [1].
>>>>> >>>
>>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>>> >>> [2].
>>>>> >>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>>> >>>
>>>>> >>>
>>>>> >>> Best,
>>>>> >>> Yang
>>>>> >>>
>>>>> >>>
>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>>> >>>
>>>>> >>>> This is a very cool feature proposal.
>>>>> >>>>
>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
>>>>> overly
>>>>> >>>> complicated to have the Leader RPC address in a different node
>>>>> than the
>>>>> >>>> LeaderLock. There is extra code needed to make sure these
>>>>> converge and the
>>>>> >>>> can be temporarily out of sync.
>>>>> >>>>
>>>>> >>>> A much easier design would be to have the RPC address as payload
>>>>> in the
>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>>> token is
>>>>> >>>> stored as payload of the lock.
>>>>> >>>> I think for the design above it would mean having a single
>>>>> ConfigMap
>>>>> >>>> for both leader lock and leader RPC address discovery.
>>>>> >>>>
>>>>> >>>> This probably serves as a good design principle in general - not
>>>>> divide
>>>>> >>>> information that is updated together over different resources.
>>>>> >>>>
>>>>> >>>> Best,
>>>>> >>>> Stephan
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>>> tonysong...@gmail.com>
>>>>> >>>> wrote:
>>>>> >>>>
>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>>> >>>>>
>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>>> reduce the
>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>>>>> this is an
>>>>> >>>>> attractive feature for users.
>>>>> >>>>>
>>>>> >>>>> Concerning the proposed design, I have some questions. Might not
>>>>> be
>>>>> >>>>> problems, just trying to understand.
>>>>> >>>>>
>>>>> >>>>> ## Architecture
>>>>> >>>>>
>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>>> contending
>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>>> ConfigMaps are
>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>>> becoming leader
>>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>>> leader's
>>>>> >>>>> address when trying to read `leader RPC address`?
>>>>> >>>>>
>>>>> >>>>> ## HA storage > Lock and release
>>>>> >>>>>
>>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>>> lock so
>>>>> >>>>> that other peers can write/remove the stored object. What if the
>>>>> previous
>>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>>> Would there
>>>>> >>>>> be any problem?
>>>>> >>>>>
>>>>> >>>>> ## HA storage > HA data clean up
>>>>> >>>>>
>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>>> <ClusterID>`,
>>>>> >>>>> how are the HA dada retained?
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> Thank you~
>>>>> >>>>>
>>>>> >>>>> Xintong Song
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
>>>>> danrtsey...@gmail.com>
>>>>> >>>>> wrote:
>>>>> >>>>>
>>>>> >>>>>> Hi devs and users,
>>>>> >>>>>>
>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>>> will
>>>>> >>>>>> introduce
>>>>> >>>>>> a new native high availability service for Kubernetes.
>>>>> >>>>>>
>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>>> widely
>>>>> >>>>>> used
>>>>> >>>>>> in production environments. It could be integrated in standalone
>>>>> >>>>>> cluster,
>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
>>>>> in K8s
>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>>> cluster.
>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>>> >>>>>> election[2]
>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>>>>> these
>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
>>>>> more
>>>>> >>>>>> convenient.
>>>>> >>>>>>
>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
>>>>> the new
>>>>> >>>>>> introduced KubernetesHaService.
>>>>> >>>>>>
>>>>> >>>>>> [1].
>>>>> >>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>> >>>>>> [2].
>>>>> >>>>>>
>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>> >>>>>> [3].
>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>> >>>>>>
>>>>> >>>>>> Looking forward to your feedback.
>>>>> >>>>>>
>>>>> >>>>>> Best,
>>>>> >>>>>> Yang
>>>>> >>>>>>
>>>>> >>>>>
>>>>>
>>>>

Reply via email to