xintongsong commented on a change in pull request #13864: URL: https://github.com/apache/flink/pull/13864#discussion_r517772747
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesJobGraphStoreWatcher.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.highavailability; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.jobmanager.JobGraphStoreWatcher; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.kubernetes.utils.Constants.JOB_GRAPH_STORE_KEY_PREFIX; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link JobGraphStoreWatcher} implementation for Kubernetes. It watch the Dispatcher leader ConfigMap and call the + * {@link JobGraphStore.JobGraphListener} based on the received event. + */ +public class KubernetesJobGraphStoreWatcher implements JobGraphStoreWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesJobGraphStoreWatcher.class); + + private final FlinkKubeClient kubeClient; + + private final String configMapName; + + private JobGraphStore.JobGraphListener jobGraphListener; + + @Nullable + private KubernetesWatch kubernetesWatch; + + public KubernetesJobGraphStoreWatcher(FlinkKubeClient kubeClient, String configMapName) { + this.kubeClient = checkNotNull(kubeClient); + this.configMapName = checkNotNull(configMapName); + } + + @Override + public void start(JobGraphStore.JobGraphListener jobGraphListener) { + this.jobGraphListener = checkNotNull(jobGraphListener); + kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new ConfigMapCallbackHandlerImpl()); + } + + @Override + public void stop() { + if (kubernetesWatch != null) { + kubernetesWatch.close(); + } + } + + private class ConfigMapCallbackHandlerImpl implements FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> { + // This is used to get the difference between current and previous data. And then notify the listener. + final Set<JobID> previousJobIDs = new HashSet<>(); + + @Override + public void onAdded(List<KubernetesConfigMap> configMaps) { + // The ConfigMap is created by KubernetesLeaderElectionDriver with empty data. We do not process this + // useless event. + } + + @Override + public void onModified(List<KubernetesConfigMap> configMaps) { + handleConfigMapEvent(configMaps); + } + + @Override + public void onDeleted(List<KubernetesConfigMap> configMaps) { + // The ConfigMap will be created again in the leader election service. + } + + @Override + public void onError(List<KubernetesConfigMap> configMaps) { + LOG.error("Error while watching the configMap {}", configMapName); + } + + @Override + public void handleFatalError(Throwable throwable) { + LOG.error("Error while watching the configMap {}", configMapName, throwable); + } + + private void handleConfigMapEvent(List<KubernetesConfigMap> configMaps) { + final KubernetesConfigMap configMap = KubernetesUtils.checkConfigMaps(configMaps, configMapName); + final Set<JobID> currentJobIDs = getJobIDs(configMap); + notifyJobGraphListenChanges(currentJobIDs, previousJobIDs); + previousJobIDs.clear(); + previousJobIDs.addAll(currentJobIDs); Review comment: Could we make an assertion here? E.g., checking the resource version. Because we are highly depending on the order in which events are received. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org