Thanks for the clarifications Yang Wang.

2. Keeping the HA information relevant for a component (Dispatcher,
JobManager, ResourceManager) in a single ConfigMap sounds good. We should
check that we don't exceed the 1 MB size limit with this approach though.
The Dispatcher's ConfigMap would then contain the current leader, the
running jobs and the pointers to the persisted JobGraphs. The JobManager's
ConfigMap would then contain the current leader, the pointers to the
checkpoints and the checkpoint ID counter, for example.

3. Ah ok, I somehow thought that K8s would give us stronger guarantees than
Yarn in this regard. That's a pity.

Cheers,
Till

On Wed, Sep 30, 2020 at 10:03 AM tison <wander4...@gmail.com> wrote:

> Thanks for your explanation. It would be fine if only checking leadership
> & actually write information is atomic.
>
> Best,
> tison.
>
>
> Yang Wang <danrtsey...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>
>> Thanks till and tison for your comments.
>>
>> @Till Rohrmann <trohrm...@apache.org>
>> 1. I am afraid we could not do this if we are going to use fabric8
>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>> client[1] also could not support it. Unless we implement a new
>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>> that we could gain too much from this.
>>
>> 2. Yes, the implementation will be a little complicated if we want to
>> completely eliminate the residual job graphs or checkpoints. Inspired by
>> your suggestion, another different solution has come into my mind. We could
>> use a same ConfigMap storing the JobManager leader, job graph,
>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>> the HA meta storage. Then it will be easier to guarantee that only the
>> leader could write the ConfigMap in a transactional operation. Since
>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>> transactional operation.
>>
>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution. However,
>> we still have the chances that two JobManager are running and trying to
>> get/delete a key in the same ConfigMap concurrently. Imagine that the
>> kubelet(like NodeManager in YARN) is down, and then the JobManager could
>> not be deleted. A new JobManager pod will be launched. We are just in the
>> similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only benefit
>> is we do not need to implement a leader election/retrieval service.
>>
>> @tison
>> Actually, I do not think we will have such issue in the Kubernetes HA
>> service. In the Kubernetes LeaderElector[2], we have the leader information
>> stored on the annotation of leader ConfigMap. So it would not happen the
>> old leader could wrongly override the leader information. Once a JobManager
>> want to write his leader information to the ConfigMap, it will check
>> whether it is the leader now. If not, anything will happen. Moreover, the
>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>> written a different update while the client was in the process of
>> performing its update.
>>
>>
>> [1].
>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>> [2].
>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>> [3].
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>
>>
>> Best,
>> Yang
>>
>> tison <wander4...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>
>>> Hi,
>>>
>>> Generally +1 for a native k8s HA service.
>>>
>>> For leader election & publish leader information, there was a
>>> discussion[1]
>>> pointed out that since these two actions is NOT atomic, there will be
>>> always
>>> edge case where a previous leader overwrite leader information, even with
>>> versioned write. Versioned write helps on read again if version
>>> mismatches
>>> so if we want version write works, information in the kv pair should
>>> help the
>>> contender reflects whether it is the current leader.
>>>
>>> The idea of writes leader information on contender node or something
>>> equivalent makes sense but the details depends on how it is implemented.
>>> General problems are that
>>>
>>> 1. TM might be a bit late before it updated correct leader information
>>> but
>>> only if the leader election process is short and leadership is stable at
>>> most
>>> time, it won't be a serious issue.
>>> 2. The process TM extract leader information might be a bit more complex
>>> than directly watching a fixed key.
>>>
>>> Atomic issue can be addressed if one leverages low APIs such as lease &
>>> txn
>>> but it causes more developing efforts. ConfigMap and encapsulated
>>> interface,
>>> thought, provides only a self-consistent mechanism which doesn't promise
>>> more consistency for extension.
>>>
>>> Best,
>>> tison.
>>>
>>> [1]
>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>
>>>
>>>
>>> Till Rohrmann <trohrm...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>
>>>> For 1. I was wondering whether we can't write the leader connection
>>>> information directly when trying to obtain the leadership (trying to
>>>> update
>>>> the leader key with one's own value)? This might be a little detail,
>>>> though.
>>>>
>>>> 2. Alright, so we are having a similar mechanism as we have in ZooKeeper
>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>> implementation a bit, unfortunately.
>>>>
>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>> configure a different persistent storage like HDFS or S3 for storing the
>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>> benefit I
>>>> see is that we avoid having to implement this multi locking mechanism in
>>>> the ConfigMaps using the annotations because we can be sure that there
>>>> is
>>>> only a single leader at a time if I understood the guarantees of K8s
>>>> correctly.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <danrtsey...@gmail.com>
>>>> wrote:
>>>>
>>>> > Hi Till, thanks for your valuable feedback.
>>>> >
>>>> > 1. Yes, leader election and storing leader information will use a same
>>>> > ConfigMap. When a contender successfully performs a versioned
>>>> annotation
>>>> > update operation to the ConfigMap, it means that it has been elected
>>>> as the
>>>> > leader. And it will write the leader information in the callback of
>>>> leader
>>>> > elector[1]. The Kubernetes resource version will help us to avoid the
>>>> > leader ConfigMap is wrongly updated.
>>>> >
>>>> > 2. The lock and release is really a valid concern. Actually in current
>>>> > design, we could not guarantee that the node who tries to write his
>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>> > address this issue, we need to store all the owners of the key. Only
>>>> when
>>>> > the owner is empty, the specific key(means a checkpoint or job graph)
>>>> could
>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>> when
>>>> > the old JobManager crashed exceptionally and do not release the lock.
>>>> To
>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>> to the
>>>> > check the JobManager timeout and then clean up the residual keys.
>>>> >
>>>> > 3. Frankly speaking, I am not against with this solution. However, in
>>>> my
>>>> > opinion, it is more like a temporary proposal. We could use
>>>> StatefulSet to
>>>> > avoid leader election and leader retrieval. But I am not sure whether
>>>> > TaskManager could properly handle the situation that same hostname
>>>> with
>>>> > different IPs, because the JobManager failed and relaunched. Also we
>>>> may
>>>> > still have two JobManagers running in some corner cases(e.g. kubelet
>>>> is
>>>> > down but the pod is running). Another concern is we have a strong
>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>> But it
>>>> > is not always true especially in self-build Kubernetes cluster.
>>>> Moreover,
>>>> > PV provider should guarantee that each PV could only be mounted once.
>>>> Since
>>>> > the native HA proposal could cover all the functionality of
>>>> StatefulSet
>>>> > proposal, that's why I prefer the former.
>>>> >
>>>> >
>>>> > [1].
>>>> >
>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>> >
>>>> > Best,
>>>> > Yang
>>>> >
>>>> > Till Rohrmann <trohrm...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>> >
>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of our
>>>> users
>>>> >> will like a ZooKeeper-less HA setup.
>>>> >>
>>>> >> +1 for not separating the leader information and the leader election
>>>> if
>>>> >> possible. Maybe it is even possible that the contender writes his
>>>> leader
>>>> >> information directly when trying to obtain the leadership by
>>>> performing a
>>>> >> versioned write operation.
>>>> >>
>>>> >> Concerning the lock and release operation I have a question: Can
>>>> there be
>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>> how can
>>>> >> we ensure that the node which writes his ownership is actually the
>>>> leader
>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>> problem
>>>> >> (we should probably change it at some point to simply use a
>>>> >> transaction which checks whether the writer is still the leader) and
>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>> that
>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>> owner
>>>> >> will then be responsible for the cleanup of the node.
>>>> >>
>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>> because
>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>> key-value
>>>> >> pairs it might be simpler to start with this approach where we only
>>>> have
>>>> >> single JM. This might already add a lot of benefits for our users.
>>>> Was
>>>> >> there a specific reason why you discarded this proposal (other than
>>>> >> generality)?
>>>> >>
>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>> you
>>>> >> already implemented a K8s based HA service.
>>>> >>
>>>> >> Cheers,
>>>> >> Till
>>>> >>
>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <danrtsey...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >>> Hi Xintong and Stephan,
>>>> >>>
>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>> >>> comments inline.
>>>> >>>
>>>> >>> # Architecture -> One or two ConfigMaps
>>>> >>>
>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>> >>> implementation easier. Actually, in my POC codes,
>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for rest
>>>> >>> server component) for the leader election
>>>> >>> and storage. Once a JobManager win the election, it will update the
>>>> >>> ConfigMap with leader address and periodically
>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>> update
>>>> >>> the FLIP document, including the architecture diagram,
>>>> >>> to avoid the misunderstanding.
>>>> >>>
>>>> >>>
>>>> >>> # HA storage > Lock and release
>>>> >>>
>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>> will be
>>>> >>> deleted by the ZK server automatically when
>>>> >>> the client is timeout. It could happen in a bad network environment
>>>> or
>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>> lock a
>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>> annotation
>>>> >>> will be cleaned up when releasing the lock. When
>>>> >>> we want to remove a job graph or checkpoints, it should satisfy the
>>>> >>> following conditions. If not, the delete operation could not be
>>>> done.
>>>> >>> * Current instance is the owner of the key.
>>>> >>> * The owner annotation is empty, which means the owner has released
>>>> the
>>>> >>> lock.
>>>> >>> * The owner annotation timed out, which usually indicate the owner
>>>> died.
>>>> >>>
>>>> >>>
>>>> >>> # HA storage > HA data clean up
>>>> >>>
>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>> pods
>>>> >>> to JobManager Deployment. So when we want to
>>>> >>> destroy a Flink cluster, we just need to delete the deployment[2].
>>>> For
>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>> >>> so that they could be retained even though we delete the whole Flink
>>>> >>> cluster.
>>>> >>>
>>>> >>>
>>>> >>> [1].
>>>> >>>
>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>> >>> [2].
>>>> >>>
>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>> >>>
>>>> >>>
>>>> >>> Best,
>>>> >>> Yang
>>>> >>>
>>>> >>>
>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>> >>>
>>>> >>>> This is a very cool feature proposal.
>>>> >>>>
>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is overly
>>>> >>>> complicated to have the Leader RPC address in a different node
>>>> than the
>>>> >>>> LeaderLock. There is extra code needed to make sure these converge
>>>> and the
>>>> >>>> can be temporarily out of sync.
>>>> >>>>
>>>> >>>> A much easier design would be to have the RPC address as payload
>>>> in the
>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>> token is
>>>> >>>> stored as payload of the lock.
>>>> >>>> I think for the design above it would mean having a single
>>>> ConfigMap
>>>> >>>> for both leader lock and leader RPC address discovery.
>>>> >>>>
>>>> >>>> This probably serves as a good design principle in general - not
>>>> divide
>>>> >>>> information that is updated together over different resources.
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Stephan
>>>> >>>>
>>>> >>>>
>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>> tonysong...@gmail.com>
>>>> >>>> wrote:
>>>> >>>>
>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>> >>>>>
>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>> reduce the
>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I think
>>>> this is an
>>>> >>>>> attractive feature for users.
>>>> >>>>>
>>>> >>>>> Concerning the proposed design, I have some questions. Might not
>>>> be
>>>> >>>>> problems, just trying to understand.
>>>> >>>>>
>>>> >>>>> ## Architecture
>>>> >>>>>
>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>> contending
>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>> ConfigMaps are
>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>> becoming leader
>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>> leader's
>>>> >>>>> address when trying to read `leader RPC address`?
>>>> >>>>>
>>>> >>>>> ## HA storage > Lock and release
>>>> >>>>>
>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>> lock so
>>>> >>>>> that other peers can write/remove the stored object. What if the
>>>> previous
>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>> Would there
>>>> >>>>> be any problem?
>>>> >>>>>
>>>> >>>>> ## HA storage > HA data clean up
>>>> >>>>>
>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>> <ClusterID>`,
>>>> >>>>> how are the HA dada retained?
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> Thank you~
>>>> >>>>>
>>>> >>>>> Xintong Song
>>>> >>>>>
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <danrtsey...@gmail.com
>>>> >
>>>> >>>>> wrote:
>>>> >>>>>
>>>> >>>>>> Hi devs and users,
>>>> >>>>>>
>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>> will
>>>> >>>>>> introduce
>>>> >>>>>> a new native high availability service for Kubernetes.
>>>> >>>>>>
>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>> widely
>>>> >>>>>> used
>>>> >>>>>> in production environments. It could be integrated in standalone
>>>> >>>>>> cluster,
>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA in
>>>> K8s
>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>> cluster.
>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>> >>>>>> election[2]
>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could leverage
>>>> these
>>>> >>>>>> features and make running HA configured Flink cluster on K8s more
>>>> >>>>>> convenient.
>>>> >>>>>>
>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from the
>>>> new
>>>> >>>>>> introduced KubernetesHaService.
>>>> >>>>>>
>>>> >>>>>> [1].
>>>> >>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>> >>>>>> [2].
>>>> >>>>>>
>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>> >>>>>> [3].
>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>> >>>>>>
>>>> >>>>>> Looking forward to your feedback.
>>>> >>>>>>
>>>> >>>>>> Best,
>>>> >>>>>> Yang
>>>> >>>>>>
>>>> >>>>>
>>>>
>>>

Reply via email to