yittg commented on a change in pull request #15501:
URL: https://github.com/apache/flink/pull/15501#discussion_r671138992

File path: 
@@ -0,0 +1,228 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kubernetes.kubeclient.resources;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerEventListener;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+    private final NamespacedKubernetesClient client;
+    private final SharedInformerFactory sharedInformerFactory;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+    private final AggregatedEventHandler aggregatedEventHandler;
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        Preconditions.checkArgument(
+                !CollectionUtil.isNullOrEmpty(labels), "Labels must not be 
null or empty");
+        this.client = client;
+        final ExecutorService executorService =
+                Executors.newSingleThreadExecutor(
+                        new 
+        this.sharedInformerFactory = client.informers(executorService);
+        this.sharedInformerFactory.withLabels(labels);
+        // Using Long.MAX_VALUE as resync period to disable the internal 
periodical resync. Zero
+        // value does not work exactly here. It could be fixed after we bump 
the fabric8 Kubernetes
+        // client version to 5.0.0+. For more details, see
+        // https://github.com/fabric8io/kubernetes-client/issues/2651.
+        this.sharedIndexInformer =
+                sharedInformerFactory.sharedIndexInformerFor(
+                        apiTypeClass, apiListTypeClass, Long.MAX_VALUE);
+        this.aggregatedEventHandler = new 
+        this.sharedIndexInformer.addEventHandler(aggregatedEventHandler);
+        this.eventWrapper = eventWrapper;
+    }
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> handler) {
+        return aggregatedEventHandler.watch(name, handler);
+    }
+    @Override
+    public void run() {
+        sharedInformerFactory.startAllRegisteredInformers();
+    }
+    @Override
+    public void close() {
+        sharedInformerFactory.stopAllRegisteredInformers();
+    }
+    private String getResourceKey(String name) {
+        return client.getNamespace() + "/" + name;
+    }
+    private class AggregatedEventHandler
+            implements ResourceEventHandler<T>, SharedInformerEventListener {
+        private final Map<String, EventHandler> handlers = new HashMap<>();
+        private final ExecutorService executorService;
+        private AggregatedEventHandler(ExecutorService executorService) {
+            this.executorService = executorService;
+        }
+        @Override
+        public void onAdd(T obj) {
+            onResourceEvent(obj);
+        }
+        @Override
+        public void onUpdate(T oldObj, T newObj) {
+            onResourceEvent(newObj);
+        }
+        @Override
+        public void onDelete(T obj, boolean deletedFinalStateUnknown) {
+            onResourceEvent(obj);
+        }
+        @Override
+        public void onException(Exception exception) {
+            handlers.forEach((k, h) -> h.handleExceptionEvent(exception));
+        }
+        private Watch watch(String name, WatchCallbackHandler<R> watch) {
+            final String resourceKey = getResourceKey(name);
+            final String watchId = UUID.randomUUID().toString();
+            final CompletableFuture<Void> closeFuture = new 
+            executorService.submit(
+                    () -> {
+                        final EventHandler eventHandler =
+                                handlers.computeIfAbsent(
+                                        resourceKey, key -> new 
+                        eventHandler.addWatch(watchId, watch);
+                    });
+            closeFuture.whenCompleteAsync(
+                    (ignored, error) -> {
+                        if (error != null) {
+                            log.error("Unhandled error while closing 
watcher.", error);
+                        }
+                        final boolean removeHandler =
+                                handlers.get(resourceKey).removeWatch(watchId);
+                        if (removeHandler) {
+                            handlers.remove(resourceKey);
+                        }
+                    },
+                    executorService);
+            return () -> closeFuture.complete(null);
+        }
+        private void onResourceEvent(T obj) {
+            executorService.submit(
+                    () -> 
+        }
+        private Optional<EventHandler> findHandler(T obj) {
+            final String resourceKey = 
+            return Optional.ofNullable(handlers.get(resourceKey));
+        }
+    }
+    private class EventHandler {
+        private final String resourceKey;
+        private final Map<String, WatchCallbackHandler<R>> handlers = new 
+        private T currentResource;
+        private EventHandler(String resourceKey) {
+            this.resourceKey = resourceKey;
+        }
+        private void addWatch(String id, WatchCallbackHandler<R> handler) {
+            log.info("Starting to watch for {}, watching id:{}", resourceKey, 
+            handlers.put(id, handler);
+            if (currentResource != null) {
+                final List<R> events =
+                handler.onAdded(events);
+            }
+        }
+        private boolean removeWatch(String id) {
+            handlers.remove(id);
+            log.info("Stopped to watch for {}, watching id:{}", resourceKey, 
+            return handlers.isEmpty();
+        }
+        private void handleResourceEvent() {
+            T newResource = 

Review comment:
       i think you are right, we can ensure the sequentiality without "diffing" 
events manually in a single thread.

File path: 
@@ -0,0 +1,228 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kubernetes.kubeclient.resources;
+import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KubernetesResourceList;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.SharedInformerEventListener;
+import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+/** Base class for shared watcher based on {@link SharedIndexInformer}. */
+public abstract class KubernetesSharedInformer<
+                T extends HasMetadata, TList extends 
KubernetesResourceList<T>, R>
+        implements KubernetesSharedWatcher<R> {
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+    private final NamespacedKubernetesClient client;
+    private final SharedInformerFactory sharedInformerFactory;
+    private final SharedIndexInformer<T> sharedIndexInformer;
+    private final Function<T, R> eventWrapper;
+    private final AggregatedEventHandler aggregatedEventHandler;
+    public KubernetesSharedInformer(
+            NamespacedKubernetesClient client,
+            Class<T> apiTypeClass,
+            Class<TList> apiListTypeClass,
+            Map<String, String> labels,
+            Function<T, R> eventWrapper) {
+        Preconditions.checkArgument(
+                !CollectionUtil.isNullOrEmpty(labels), "Labels must not be 
null or empty");
+        this.client = client;
+        final ExecutorService executorService =
+                Executors.newSingleThreadExecutor(
+                        new 
+        this.sharedInformerFactory = client.informers(executorService);
+        this.sharedInformerFactory.withLabels(labels);
+        // Using Long.MAX_VALUE as resync period to disable the internal 
periodical resync. Zero
+        // value does not work exactly here. It could be fixed after we bump 
the fabric8 Kubernetes
+        // client version to 5.0.0+. For more details, see
+        // https://github.com/fabric8io/kubernetes-client/issues/2651.
+        this.sharedIndexInformer =
+                sharedInformerFactory.sharedIndexInformerFor(
+                        apiTypeClass, apiListTypeClass, Long.MAX_VALUE);
+        this.aggregatedEventHandler = new 
+        this.sharedIndexInformer.addEventHandler(aggregatedEventHandler);
+        this.eventWrapper = eventWrapper;
+    }
+    @Override
+    public Watch watch(String name, WatchCallbackHandler<R> handler) {
+        return aggregatedEventHandler.watch(name, handler);
+    }
+    @Override
+    public void run() {
+        sharedInformerFactory.startAllRegisteredInformers();
+    }
+    @Override
+    public void close() {
+        sharedInformerFactory.stopAllRegisteredInformers();
+    }
+    private String getResourceKey(String name) {
+        return client.getNamespace() + "/" + name;
+    }
+    private class AggregatedEventHandler
+            implements ResourceEventHandler<T>, SharedInformerEventListener {
+        private final Map<String, EventHandler> handlers = new HashMap<>();
+        private final ExecutorService executorService;
+        private AggregatedEventHandler(ExecutorService executorService) {
+            this.executorService = executorService;
+        }
+        @Override
+        public void onAdd(T obj) {
+            onResourceEvent(obj);
+        }
+        @Override
+        public void onUpdate(T oldObj, T newObj) {
+            onResourceEvent(newObj);
+        }
+        @Override
+        public void onDelete(T obj, boolean deletedFinalStateUnknown) {
+            onResourceEvent(obj);
+        }
+        @Override
+        public void onException(Exception exception) {
+            handlers.forEach((k, h) -> h.handleExceptionEvent(exception));
+        }
+        private Watch watch(String name, WatchCallbackHandler<R> watch) {
+            final String resourceKey = getResourceKey(name);
+            final String watchId = UUID.randomUUID().toString();
+            final CompletableFuture<Void> closeFuture = new 
+            executorService.submit(
+                    () -> {
+                        final EventHandler eventHandler =
+                                handlers.computeIfAbsent(
+                                        resourceKey, key -> new 
+                        eventHandler.addWatch(watchId, watch);
+                    });
+            closeFuture.whenCompleteAsync(
+                    (ignored, error) -> {
+                        if (error != null) {
+                            log.error("Unhandled error while closing 
watcher.", error);
+                        }
+                        final boolean removeHandler =
+                                handlers.get(resourceKey).removeWatch(watchId);
+                        if (removeHandler) {
+                            handlers.remove(resourceKey);
+                        }
+                    },
+                    executorService);
+            return () -> closeFuture.complete(null);
+        }
+        private void onResourceEvent(T obj) {
+            executorService.submit(
+                    () -> 
+        }
+        private Optional<EventHandler> findHandler(T obj) {
+            final String resourceKey = 
+            return Optional.ofNullable(handlers.get(resourceKey));
+        }
+    }
+    private class EventHandler {
+        private final String resourceKey;
+        private final Map<String, WatchCallbackHandler<R>> handlers = new 
+        private T currentResource;
+        private EventHandler(String resourceKey) {
+            this.resourceKey = resourceKey;
+        }
+        private void addWatch(String id, WatchCallbackHandler<R> handler) {
+            log.info("Starting to watch for {}, watching id:{}", resourceKey, 
+            handlers.put(id, handler);
+            if (currentResource != null) {
+                final List<R> events =
+                handler.onAdded(events);
+            }
+        }
+        private boolean removeWatch(String id) {
+            handlers.remove(id);
+            log.info("Stopped to watch for {}, watching id:{}", resourceKey, 
+            return handlers.isEmpty();
+        }
+        private void handleResourceEvent() {
+            T newResource = 

Review comment:

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:

Reply via email to