gharris1727 commented on code in PR #14195:
URL: https://github.com/apache/kafka/pull/14195#discussion_r1294053643


##########
tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.connect.runtime.isolation.PluginSource;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * An in-memory workspace for manipulating {@link java.util.ServiceLoader} 
manifest files.
+ * <p>Use {@link #forSource(PluginSource)} to get a workspace scoped to a 
single plugin location, which is able
+ * to accept simulated reads and writes of manifests.
+ * Write the simulated changes to disk via {@link #commit(boolean)}.
+ */
+public class ManifestWorkspace {
+
+    private static final String MANIFEST_PREFIX = "META-INF/services/";
+    private static final Path MANAGED_PATH = 
Paths.get("connect-plugin-path-shim-0.0.1-SNAPSHOT.jar");
+    private final PrintStream out;
+    private final List<SourceWorkspace<?>> workspaces;
+    private final Map<Path, Path> temporaryOverlayFiles;
+
+    public ManifestWorkspace(PrintStream out) {
+        this.out = out;
+        workspaces = new ArrayList<>();
+        temporaryOverlayFiles = new HashMap<>();
+    }
+
+    public SourceWorkspace<?> forSource(PluginSource source) throws 
IOException {
+        SourceWorkspace<?> sourceWorkspace;
+        switch (source.type()) {
+            case CLASSPATH:
+                sourceWorkspace = new ClasspathWorkspace(source);
+                break;
+            case MULTI_JAR:
+                sourceWorkspace = new MultiJarWorkspace(source);
+                break;
+            case SINGLE_JAR:
+                sourceWorkspace = new SingleJarWorkspace(source);
+                break;
+            case CLASS_HIERARCHY:
+                sourceWorkspace = new ClassHierarchyWorkspace(source);
+                break;
+            default:
+                throw new IllegalStateException("Unknown source type " + 
source.type());
+        }
+        workspaces.add(sourceWorkspace);
+        return sourceWorkspace;
+    }
+
+    /**
+     * Commits all queued changes to disk
+     * @return true if any workspace wrote changes to disk, false if all 
workspaces did not have writes to apply
+     * @throws IOException if an error occurs reading or writing to the 
filesystem
+     * @throws TerseException if a path is not writable on disk and should be.
+     */
+    public boolean commit(boolean dryRun) throws IOException, TerseException {
+        boolean changed = false;
+        for (SourceWorkspace<?> workspace : workspaces) {
+            changed |= workspace.commit(dryRun);
+        }
+        return changed;
+    }
+
+    /**
+     * A workspace scoped to a single plugin source.
+     * <p>Buffers simulated reads and writes to the plugin path before they 
can be written to disk.
+     * @param <T> The data structure used by the workspace to store in-memory 
manifests internally.
+     */
+    public static abstract class SourceWorkspace<T> {
+        private final Path location;
+        private final PluginSource.Type type;
+        protected final T initial;
+        protected final T manifests;
+
+        private SourceWorkspace(PluginSource source) throws IOException {
+            this.location = source.location();
+            this.type = source.type();
+            this.initial = load(source);
+            this.manifests = load(source);
+        }
+
+        public Path location() {
+            return location;
+        }
+
+        public PluginSource.Type type() {
+            return type;
+        }
+
+        protected abstract T load(PluginSource source) throws IOException;
+
+        public abstract boolean hasManifest(PluginType type, String className);
+
+        public abstract void forEach(BiConsumer<String, PluginType> consumer);
+
+        public abstract void addManifest(PluginType type, String pluginClass);
+
+        public abstract void removeManifest(PluginType type, String 
pluginClass);
+
+        protected abstract boolean commit(boolean dryRun) throws 
TerseException, IOException;
+
+        protected static Map<PluginType, Set<String>> loadManifest(URL 
baseUrl) throws MalformedURLException {
+            Map<PluginType, Set<String>> manifests = new 
EnumMap<>(PluginType.class);
+            for (PluginType type : PluginType.values()) {
+                Set<String> result;
+                try {
+                    URL u = new URL(baseUrl, MANIFEST_PREFIX + 
type.superClass().getName());
+                    result = parse(u);
+                } catch (RuntimeException e) {
+                    result = new LinkedHashSet<>();
+                }
+                manifests.put(type, result);
+            }
+            return manifests;
+        }
+
+        protected static URL jarBaseUrl(URL fileUrl) throws 
MalformedURLException {
+            return new URL("jar", "", -1, fileUrl + "!/", null);
+        }
+
+        protected static void forEach(Map<PluginType, Set<String>> manifests, 
BiConsumer<String, PluginType> consumer) {
+            manifests.forEach((type, classNames) -> 
classNames.forEach(className -> consumer.accept(className, type)));
+        }
+    }
+
+    /**
+     * A single jar can only contain one manifest per plugin type.
+     */
+    private class SingleJarWorkspace extends SourceWorkspace<Map<PluginType, 
Set<String>>> {
+
+        private SingleJarWorkspace(PluginSource source) throws IOException {
+            super(source);

Review Comment:
   it looks a little out of place as the only assertion, but it should always 
hold.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to