gharris1727 commented on code in PR #18325:
URL: https://github.com/apache/kafka/pull/18325#discussion_r2058956971


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##########
@@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir, 
Path binDir) throws IOExc
             if (!success) {
                 throw new RuntimeException("Failed to compile test plugin:\n" 
+ writer);
             }
+        } finally {
+            if (!replacements.isEmpty()) {
+                sourceFiles.forEach(File::delete);
+            }
+        }
+    }
+
+    private static File copyAndReplace(File source, Map<String, String> 
replacements) throws RuntimeException {
+        if (replacements.isEmpty()) {
+            return source;
+        }
+        try {
+            String content = Files.readString(source.toPath());
+            for (Map.Entry<String, String> entry : replacements.entrySet()) {
+                content = content.replace(entry.getKey(), entry.getValue());
+            }
+            File tmpFile = new File(System.getProperty("java.io.tmpdir") + 
File.separator + source.getName());

Review Comment:
   nit: use Files.createTempFile?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##########
@@ -52,6 +52,7 @@
 import javax.tools.StandardJavaFileManager;
 import javax.tools.ToolProvider;
 
+

Review Comment:
   nit: newline



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class VersionedPluginBuilder {
+
+    private static final String VERSION_PLACEHOLDER = 
"PLACEHOLDER_FOR_VERSION";
+
+    public enum VersionedTestPlugin {
+
+        SINK_CONNECTOR("sampling-connector", 
"test.plugins.VersionedSamplingSinkConnector"),
+        SOURCE_CONNECTOR("versioned-source-connector", 
"test.plugins.VersionedSamplingSourceConnector"),
+        CONVERTER("sampling-converter", 
"test.plugins.VersionedSamplingConverter"),
+        HEADER_CONVERTER("sampling-header-converter", 
"test.plugins.VersionedSamplingHeaderConverter"),
+        TRANSFORMATION("versioned-transformation", 
"test.plugins.VersionedTransformation"),
+        PREDICATE("versioned-predicate", "test.plugins.VersionedPredicate");
+
+        private final String resourceDir;
+        private final String className;
+
+        VersionedTestPlugin(String resourceDir, String className) {
+            this.resourceDir = resourceDir;
+            this.className = className;
+        }
+
+        public String resourceDir() {
+            return resourceDir;
+        }
+
+        public String className() {
+            return className;
+        }
+    }
+
+    public static class BuildInfo {
+
+        private final VersionedTestPlugin plugin;
+        private final String version;
+        private String location;
+
+        private BuildInfo(VersionedTestPlugin plugin, String version) {
+            this.plugin = plugin;
+            this.version = version;
+        }
+
+        private void setLocation(String location) {
+            this.location = location;
+        }
+
+        public VersionedTestPlugin plugin() {
+            return plugin;
+        }
+
+        public String version() {
+            return version;
+        }
+
+        public String location() {
+            return location;
+        }
+    }
+
+    private final List<BuildInfo> pluginBuilds;
+
+    public VersionedPluginBuilder() {
+        pluginBuilds = new ArrayList<>();
+    }
+
+    public VersionedPluginBuilder include(VersionedTestPlugin plugin, String 
version) {
+        pluginBuilds.add(new BuildInfo(plugin, version));
+        return this;
+    }
+
+    public synchronized Path build(String pluginDir) throws IOException {
+        Path pluginDirPath = Files.createTempDirectory(pluginDir);
+        Path subDir = Files.createDirectory(pluginDirPath.resolve("lib"));
+        for (BuildInfo buildInfo : pluginBuilds) {

Review Comment:
   Allowing the code to combine multiple test plugins together into a single 
plugin location is very interesting. Currently the bad-packaging plugin relies 
on plugins being located together, but this is because the code is laid out as 
multiple source files inside of a single plugin.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class MultiVersionTest {
+
+    private static Plugins setUpPlugins(Map<Path, 
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) {
+        String pluginPath = 
artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(","));
+        Map<String, String> configs = new HashMap<>();
+        configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+        configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name());
+        return new Plugins(configs);
+    }
+
+    private void assertPluginLoad(Map<Path, 
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode)
+            throws InvalidVersionSpecificationException, 
ClassNotFoundException {
+
+        Plugins plugins = setUpPlugins(artifacts, mode);
+
+        for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry : 
artifacts.entrySet()) {
+            String pluginLocation = entry.getKey().toAbsolutePath().toString();
+
+            for (VersionedPluginBuilder.BuildInfo buildInfo : 
entry.getValue()) {
+                ClassLoader pluginLoader = 
plugins.pluginLoader(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+                Assertions.assertInstanceOf(PluginClassLoader.class, 
pluginLoader);
+                Assertions.assertTrue(((PluginClassLoader) 
pluginLoader).location().contains(pluginLocation));
+                Object p = plugins.newPlugin(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+                Assertions.assertInstanceOf(Versioned.class, p);
+                Assertions.assertEquals(buildInfo.version(), ((Versioned) 
p).version());
+            }
+        }
+    }
+
+    private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{
+        PluginType.SINK, PluginType.SOURCE, PluginType.CONVERTER,
+        PluginType.HEADER_CONVERTER, PluginType.TRANSFORMATION, 
PluginType.PREDICATE
+    };
+
+    private void assertCorrectLatestPluginVersion(
+            Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts,
+            PluginDiscoveryMode mode,
+            String latestVersion
+    ) {
+        Plugins plugins = setUpPlugins(artifacts, mode);
+        List<String> classes = artifacts.values().stream()
+                .flatMap(List::stream)
+                .map(VersionedPluginBuilder.BuildInfo::plugin)
+                .map(VersionedPluginBuilder.VersionedTestPlugin::className)
+                .distinct()
+                .toList();
+        for (String className : classes) {
+            String version = plugins.latestVersion(className, 
ALL_PLUGIN_TYPES);
+            Assertions.assertEquals(latestVersion, version);
+        }
+    }
+
+    private static Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
buildIsolatedArtifacts(
+            String[] versions,
+            VersionedPluginBuilder.VersionedTestPlugin[] pluginTypes
+    ) throws IOException {
+        Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new 
HashMap<>();
+        for (String v : versions) {
+            for (VersionedPluginBuilder.VersionedTestPlugin pluginType: 
pluginTypes) {
+                VersionedPluginBuilder builder = new VersionedPluginBuilder();
+                builder.include(pluginType, v);
+                artifacts.put(builder.build(pluginType + "-" + v), 
builder.buildInfos());
+            }
+        }
+        return artifacts;
+    }
+
+    public static final String DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION;
+    public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
DEFAULT_ISOLATED_ARTIFACTS;
+    public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
DEFAULT_COMBINED_ARTIFACT;
+    public static final Plugins MULTI_VERSION_PLUGINS;
+    public static final Map<VersionedPluginBuilder.VersionedTestPlugin, 
String> DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+
+    static {
+
+        String[] defaultIsolatedArtifactsVersions = new String[]{"1.1.0", 
"2.3.0", "4.3.0"};
+        try {
+            DEFAULT_ISOLATED_ARTIFACTS = buildIsolatedArtifacts(
+                defaultIsolatedArtifactsVersions, 
VersionedPluginBuilder.VersionedTestPlugin.values()
+            );
+            DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION = "4.3.0";
+            DEFAULT_COMBINED_ARTIFACT_VERSIONS = new HashMap<>();
+
+            VersionedPluginBuilder builder = new VersionedPluginBuilder();
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
 k -> "0.0.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
 k -> "0.1.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
 k -> "0.2.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
 k -> "0.3.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
 k -> "0.4.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
 k -> "0.5.0"));
+            DEFAULT_COMBINED_ARTIFACT = 
Collections.singletonMap(builder.build("all_versioned_artifact"), 
builder.buildInfos());
+
+            Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new 
HashMap<>();
+            artifacts.putAll(DEFAULT_COMBINED_ARTIFACT);
+            artifacts.putAll(DEFAULT_ISOLATED_ARTIFACTS);
+            MULTI_VERSION_PLUGINS = setUpPlugins(artifacts, 
PluginDiscoveryMode.SERVICE_LOAD);
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void TestVersionedPluginLoaded() throws 
InvalidVersionSpecificationException, ClassNotFoundException {

Review Comment:
   nit: lowercase the first letter of all of the tests. Here and all tests in 
PluginRecommenderTest.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##########
@@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir, 
Path binDir) throws IOExc
             if (!success) {
                 throw new RuntimeException("Failed to compile test plugin:\n" 
+ writer);
             }
+        } finally {
+            if (!replacements.isEmpty()) {
+                sourceFiles.forEach(File::delete);
+            }
+        }
+    }
+
+    private static File copyAndReplace(File source, Map<String, String> 
replacements) throws RuntimeException {
+        if (replacements.isEmpty()) {
+            return source;
+        }

Review Comment:
   Having the same condition in multiple disconnected places is a common source 
of bugs. If these two pieces of code are not called together, or someone 
modifies one condition but not the other, things won't behave correctly.
   
   Could the control flow be simplified? It's worth considering whether you 
need either of these conditions in the code at all. 



##########
connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+import java.util.Map;
+
+/**
+ /**
+ * Predicate to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
+ */
+public class VersionedPredicate<R extends ConnectRecord<R>> implements 
Predicate<R>, Versioned {

Review Comment:
   nit: We have NonMigratedPredicate already, maybe we need a migrated, but 
non-versioned predicate?
   Similar for Transformation.
   
   This is already missing test coverage on trunk, so maybe it can be addressed 
in a follow-up. I think it's also a bug that you fixed in an earlier PR.



##########
connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/VersionedSamplingConverter.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
+
+/**
+ * Converter to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
+ */
+public class VersionedSamplingConverter extends SamplingConverter implements 
Versioned {

Review Comment:
   There is definitely value in having a distinct class here, because the 
Converter interface doesn't extend Versioned.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class VersionedPluginBuilder {
+
+    private static final String VERSION_PLACEHOLDER = 
"PLACEHOLDER_FOR_VERSION";
+
+    public enum VersionedTestPlugin {

Review Comment:
   WDYT about rolling these into the TestPlugin enum?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##########
@@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir, 
Path binDir) throws IOExc
             if (!success) {
                 throw new RuntimeException("Failed to compile test plugin:\n" 
+ writer);
             }
+        } finally {
+            if (!replacements.isEmpty()) {
+                sourceFiles.forEach(File::delete);
+            }
+        }
+    }
+
+    private static File copyAndReplace(File source, Map<String, String> 
replacements) throws RuntimeException {
+        if (replacements.isEmpty()) {
+            return source;
+        }
+        try {
+            String content = Files.readString(source.toPath());
+            for (Map.Entry<String, String> entry : replacements.entrySet()) {
+                content = content.replace(entry.getKey(), entry.getValue());

Review Comment:
   nit: I don't really like this "modifying arbitrary source code before 
compilation" technique, it's very powerful and open-ended, and could be misused 
and be more difficult to debug like self-modifying code. But because the 
current application is within reason, maybe this is good enough to merge.
   
   Prior art here is the ReadVersionFromResourcePlugin, whose code is 
duplicated in read-version-from-resource-v1 and read-version-from-resource-v2, 
with version files are specified explicitly. I think that we shouldn't follow 
this same strategy for these new plugins and versions, but there's a middle 
ground where we generate version files from Strings specified in 
createPluginJar/writeJar.



##########
connect/runtime/src/test/resources/test-plugins/sampling-connector/test/plugins/VersionedSamplingSinkConnector.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * VersionedSamplingSinkConnector is a test connector that extends 
SamplingConnector and overrides the version method.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with 
the actual version during plugin compilation.
+ */
+public final class VersionedSamplingSinkConnector extends SamplingConnector {

Review Comment:
   Is it significant that this is a SamplingConnector or SamplingPlugin, or is 
that just for convenience? I see that the SamplingPlugin interface is only used 
in PluginsTest, which doesn't use this (or the other Versioned) plugins.
   
   The SamplingPlugin was meant specifically for the plugins to exfiltrate 
ClassLoader references to assertions in PluginsTest code, it's not needed if 
you're not performing those assertions, as you can just extend the connect-api 
class.
   
   Theoretically you could add classloader assertions to the multiversion 
tests, but that doesn't feel very high priority, as the existing classloader 
assertions should have pretty good coverage. Or because Connectors are always 
Versioned, maybe you could add the version substitution to the 
SamplingConnector itself, rather than extending it.
   
   One problem with combining both fixed and dynamic versioned plugins together 
is that you are obligated to bring an additional copy of the fixed version 
plugin with each dynamic versioned plugin. For example, if you want a 1.0 and 
2.0 VersionedSamplingSinkConnector in the same plugin location, you are forced 
to have two .class files for the 1.0.0 SamplingConnector. This is not generally 
a supported condition because of classloader nondeterminism, but because it's 
all compiled from the same source it probably won't break. Unless the code 
substitution mechanism causes a binary incompatibility....
   
   TLDR: I think the Versioned plugins should not implement SamplingPlugin, and 
should be in their own packages like the versioned-predicate and 
versioned-transformation.
   
   This applies to all of the Sampling test plugins.
   



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class MultiVersionTest {
+
+    private static Plugins setUpPlugins(Map<Path, 
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) {
+        String pluginPath = 
artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(","));
+        Map<String, String> configs = new HashMap<>();
+        configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+        configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name());
+        return new Plugins(configs);
+    }
+
+    private void assertPluginLoad(Map<Path, 
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode)
+            throws InvalidVersionSpecificationException, 
ClassNotFoundException {
+
+        Plugins plugins = setUpPlugins(artifacts, mode);
+
+        for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry : 
artifacts.entrySet()) {
+            String pluginLocation = entry.getKey().toAbsolutePath().toString();
+
+            for (VersionedPluginBuilder.BuildInfo buildInfo : 
entry.getValue()) {
+                ClassLoader pluginLoader = 
plugins.pluginLoader(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+                Assertions.assertInstanceOf(PluginClassLoader.class, 
pluginLoader);
+                Assertions.assertTrue(((PluginClassLoader) 
pluginLoader).location().contains(pluginLocation));
+                Object p = plugins.newPlugin(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+                Assertions.assertInstanceOf(Versioned.class, p);
+                Assertions.assertEquals(buildInfo.version(), ((Versioned) 
p).version());
+            }
+        }
+    }
+
+    private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{

Review Comment:
   nit: The tests still pass if this variable is set to PluginType.values(). 
Should they fail? Can we eliminate this variable?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##########
@@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir, 
Path binDir) throws IOExc
             if (!success) {
                 throw new RuntimeException("Failed to compile test plugin:\n" 
+ writer);
             }
+        } finally {
+            if (!replacements.isEmpty()) {
+                sourceFiles.forEach(File::delete);
+            }
+        }
+    }
+
+    private static File copyAndReplace(File source, Map<String, String> 
replacements) throws RuntimeException {
+        if (replacements.isEmpty()) {
+            return source;
+        }
+        try {
+            String content = Files.readString(source.toPath());
+            for (Map.Entry<String, String> entry : replacements.entrySet()) {
+                content = content.replace(entry.getKey(), entry.getValue());
+            }
+            File tmpFile = new File(System.getProperty("java.io.tmpdir") + 
File.separator + source.getName());
+            Files.writeString(tmpFile.toPath(), content);
+            return tmpFile;

Review Comment:
   Please also call deleteOnExit() for this file.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class MultiVersionTest {
+
+    private static Plugins setUpPlugins(Map<Path, 
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) {
+        String pluginPath = 
artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(","));
+        Map<String, String> configs = new HashMap<>();
+        configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+        configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name());
+        return new Plugins(configs);
+    }
+
+    private void assertPluginLoad(Map<Path, 
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode)
+            throws InvalidVersionSpecificationException, 
ClassNotFoundException {
+
+        Plugins plugins = setUpPlugins(artifacts, mode);
+
+        for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry : 
artifacts.entrySet()) {
+            String pluginLocation = entry.getKey().toAbsolutePath().toString();
+
+            for (VersionedPluginBuilder.BuildInfo buildInfo : 
entry.getValue()) {
+                ClassLoader pluginLoader = 
plugins.pluginLoader(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+                Assertions.assertInstanceOf(PluginClassLoader.class, 
pluginLoader);
+                Assertions.assertTrue(((PluginClassLoader) 
pluginLoader).location().contains(pluginLocation));
+                Object p = plugins.newPlugin(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+                Assertions.assertInstanceOf(Versioned.class, p);
+                Assertions.assertEquals(buildInfo.version(), ((Versioned) 
p).version());
+            }
+        }
+    }
+
+    private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{
+        PluginType.SINK, PluginType.SOURCE, PluginType.CONVERTER,
+        PluginType.HEADER_CONVERTER, PluginType.TRANSFORMATION, 
PluginType.PREDICATE
+    };
+
+    private void assertCorrectLatestPluginVersion(
+            Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts,
+            PluginDiscoveryMode mode,
+            String latestVersion
+    ) {
+        Plugins plugins = setUpPlugins(artifacts, mode);
+        List<String> classes = artifacts.values().stream()
+                .flatMap(List::stream)
+                .map(VersionedPluginBuilder.BuildInfo::plugin)
+                .map(VersionedPluginBuilder.VersionedTestPlugin::className)
+                .distinct()
+                .toList();
+        for (String className : classes) {
+            String version = plugins.latestVersion(className, 
ALL_PLUGIN_TYPES);
+            Assertions.assertEquals(latestVersion, version);
+        }
+    }
+
+    private static Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
buildIsolatedArtifacts(
+            String[] versions,
+            VersionedPluginBuilder.VersionedTestPlugin[] pluginTypes
+    ) throws IOException {
+        Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new 
HashMap<>();
+        for (String v : versions) {
+            for (VersionedPluginBuilder.VersionedTestPlugin pluginType: 
pluginTypes) {
+                VersionedPluginBuilder builder = new VersionedPluginBuilder();
+                builder.include(pluginType, v);
+                artifacts.put(builder.build(pluginType + "-" + v), 
builder.buildInfos());
+            }
+        }
+        return artifacts;
+    }
+
+    public static final String DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION;
+    public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
DEFAULT_ISOLATED_ARTIFACTS;
+    public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
DEFAULT_COMBINED_ARTIFACT;
+    public static final Plugins MULTI_VERSION_PLUGINS;
+    public static final Map<VersionedPluginBuilder.VersionedTestPlugin, 
String> DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+
+    static {
+
+        String[] defaultIsolatedArtifactsVersions = new String[]{"1.1.0", 
"2.3.0", "4.3.0"};
+        try {
+            DEFAULT_ISOLATED_ARTIFACTS = buildIsolatedArtifacts(
+                defaultIsolatedArtifactsVersions, 
VersionedPluginBuilder.VersionedTestPlugin.values()
+            );
+            DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION = "4.3.0";
+            DEFAULT_COMBINED_ARTIFACT_VERSIONS = new HashMap<>();
+
+            VersionedPluginBuilder builder = new VersionedPluginBuilder();
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
 k -> "0.0.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
 k -> "0.1.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
 k -> "0.2.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
 k -> "0.3.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
 k -> "0.4.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
 k -> "0.5.0"));
+            DEFAULT_COMBINED_ARTIFACT = 
Collections.singletonMap(builder.build("all_versioned_artifact"), 
builder.buildInfos());
+
+            Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new 
HashMap<>();
+            artifacts.putAll(DEFAULT_COMBINED_ARTIFACT);
+            artifacts.putAll(DEFAULT_ISOLATED_ARTIFACTS);
+            MULTI_VERSION_PLUGINS = setUpPlugins(artifacts, 
PluginDiscoveryMode.SERVICE_LOAD);
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void TestVersionedPluginLoaded() throws 
InvalidVersionSpecificationException, ClassNotFoundException {
+        assertPluginLoad(DEFAULT_COMBINED_ARTIFACT, 
PluginDiscoveryMode.SERVICE_LOAD);
+        assertPluginLoad(DEFAULT_COMBINED_ARTIFACT, 
PluginDiscoveryMode.ONLY_SCAN);
+    }
+
+    @Test
+    public void TestMultipleIsolatedVersionedPluginLoading() throws 
InvalidVersionSpecificationException, ClassNotFoundException {
+        assertPluginLoad(DEFAULT_ISOLATED_ARTIFACTS, 
PluginDiscoveryMode.SERVICE_LOAD);
+        assertPluginLoad(DEFAULT_ISOLATED_ARTIFACTS, 
PluginDiscoveryMode.ONLY_SCAN);
+    }
+
+    @Test
+    public void TestLatestVersion() {
+        assertCorrectLatestPluginVersion(DEFAULT_ISOLATED_ARTIFACTS, 
PluginDiscoveryMode.SERVICE_LOAD, DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION);
+        assertCorrectLatestPluginVersion(DEFAULT_ISOLATED_ARTIFACTS, 
PluginDiscoveryMode.ONLY_SCAN, DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION);
+    }
+
+    @Test
+    public void TestBundledPluginLoading() throws 
InvalidVersionSpecificationException, ClassNotFoundException {
+
+        Plugins plugins = MULTI_VERSION_PLUGINS;
+        // get the connector loader of the combined artifact which includes 
all plugin types
+        ClassLoader connectorLoader = plugins.pluginLoader(
+            
VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className(),
+            PluginUtils.connectorVersionRequirement("0.1.0")
+        );
+        Assertions.assertInstanceOf(PluginClassLoader.class, connectorLoader);
+
+        List<VersionedPluginBuilder.VersionedTestPlugin> pluginTypes = List.of(
+            VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+            VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+            VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+            VersionedPluginBuilder.VersionedTestPlugin.PREDICATE
+        );
+        // should match the version used in setUp for creating the combined 
artifact
+        List<String> versions = Arrays.asList("0.2.0", "0.3.0", "0.4.0", 
"0.5.0");

Review Comment:
   Do these versions correspond to the versions specified in the static 
initializer? Can they be shared constants?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class MultiVersionTest {
+
+    private static Plugins setUpPlugins(Map<Path, 
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) {
+        String pluginPath = 
artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(","));
+        Map<String, String> configs = new HashMap<>();
+        configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+        configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name());
+        return new Plugins(configs);
+    }
+
+    private void assertPluginLoad(Map<Path, 
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode)
+            throws InvalidVersionSpecificationException, 
ClassNotFoundException {
+
+        Plugins plugins = setUpPlugins(artifacts, mode);
+
+        for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry : 
artifacts.entrySet()) {
+            String pluginLocation = entry.getKey().toAbsolutePath().toString();
+
+            for (VersionedPluginBuilder.BuildInfo buildInfo : 
entry.getValue()) {
+                ClassLoader pluginLoader = 
plugins.pluginLoader(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+                Assertions.assertInstanceOf(PluginClassLoader.class, 
pluginLoader);
+                Assertions.assertTrue(((PluginClassLoader) 
pluginLoader).location().contains(pluginLocation));
+                Object p = plugins.newPlugin(buildInfo.plugin().className(), 
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+                Assertions.assertInstanceOf(Versioned.class, p);
+                Assertions.assertEquals(buildInfo.version(), ((Versioned) 
p).version());
+            }
+        }
+    }
+
+    private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{
+        PluginType.SINK, PluginType.SOURCE, PluginType.CONVERTER,
+        PluginType.HEADER_CONVERTER, PluginType.TRANSFORMATION, 
PluginType.PREDICATE
+    };
+
+    private void assertCorrectLatestPluginVersion(
+            Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts,
+            PluginDiscoveryMode mode,
+            String latestVersion
+    ) {
+        Plugins plugins = setUpPlugins(artifacts, mode);
+        List<String> classes = artifacts.values().stream()
+                .flatMap(List::stream)
+                .map(VersionedPluginBuilder.BuildInfo::plugin)
+                .map(VersionedPluginBuilder.VersionedTestPlugin::className)
+                .distinct()
+                .toList();
+        for (String className : classes) {
+            String version = plugins.latestVersion(className, 
ALL_PLUGIN_TYPES);
+            Assertions.assertEquals(latestVersion, version);
+        }
+    }
+
+    private static Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
buildIsolatedArtifacts(
+            String[] versions,
+            VersionedPluginBuilder.VersionedTestPlugin[] pluginTypes
+    ) throws IOException {
+        Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new 
HashMap<>();
+        for (String v : versions) {
+            for (VersionedPluginBuilder.VersionedTestPlugin pluginType: 
pluginTypes) {
+                VersionedPluginBuilder builder = new VersionedPluginBuilder();
+                builder.include(pluginType, v);
+                artifacts.put(builder.build(pluginType + "-" + v), 
builder.buildInfos());
+            }
+        }
+        return artifacts;
+    }
+
+    public static final String DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION;
+    public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
DEFAULT_ISOLATED_ARTIFACTS;
+    public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> 
DEFAULT_COMBINED_ARTIFACT;
+    public static final Plugins MULTI_VERSION_PLUGINS;
+    public static final Map<VersionedPluginBuilder.VersionedTestPlugin, 
String> DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+
+    static {
+
+        String[] defaultIsolatedArtifactsVersions = new String[]{"1.1.0", 
"2.3.0", "4.3.0"};
+        try {
+            DEFAULT_ISOLATED_ARTIFACTS = buildIsolatedArtifacts(
+                defaultIsolatedArtifactsVersions, 
VersionedPluginBuilder.VersionedTestPlugin.values()
+            );
+            DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION = "4.3.0";
+            DEFAULT_COMBINED_ARTIFACT_VERSIONS = new HashMap<>();
+
+            VersionedPluginBuilder builder = new VersionedPluginBuilder();
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
 k -> "0.0.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
 k -> "0.1.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
 k -> "0.2.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
 k -> "0.3.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
 k -> "0.4.0"));
+            
builder.include(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
+                
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
 k -> "0.5.0"));
+            DEFAULT_COMBINED_ARTIFACT = 
Collections.singletonMap(builder.build("all_versioned_artifact"), 
builder.buildInfos());

Review Comment:
   nit: Why is this computeIfAbsent, when we are constructing the map for the 
first time?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class VersionedPluginBuilder {
+
+    private static final String VERSION_PLACEHOLDER = 
"PLACEHOLDER_FOR_VERSION";
+
+    public enum VersionedTestPlugin {
+
+        SINK_CONNECTOR("sampling-connector", 
"test.plugins.VersionedSamplingSinkConnector"),
+        SOURCE_CONNECTOR("versioned-source-connector", 
"test.plugins.VersionedSamplingSourceConnector"),
+        CONVERTER("sampling-converter", 
"test.plugins.VersionedSamplingConverter"),
+        HEADER_CONVERTER("sampling-header-converter", 
"test.plugins.VersionedSamplingHeaderConverter"),
+        TRANSFORMATION("versioned-transformation", 
"test.plugins.VersionedTransformation"),
+        PREDICATE("versioned-predicate", "test.plugins.VersionedPredicate");
+
+        private final String resourceDir;
+        private final String className;
+
+        VersionedTestPlugin(String resourceDir, String className) {
+            this.resourceDir = resourceDir;
+            this.className = className;
+        }
+
+        public String resourceDir() {
+            return resourceDir;
+        }
+
+        public String className() {
+            return className;
+        }
+    }
+
+    public static class BuildInfo {
+
+        private final VersionedTestPlugin plugin;
+        private final String version;
+        private String location;
+
+        private BuildInfo(VersionedTestPlugin plugin, String version) {
+            this.plugin = plugin;
+            this.version = version;
+        }
+
+        private void setLocation(String location) {
+            this.location = location;
+        }
+
+        public VersionedTestPlugin plugin() {
+            return plugin;
+        }
+
+        public String version() {
+            return version;
+        }
+
+        public String location() {
+            return location;
+        }
+    }
+
+    private final List<BuildInfo> pluginBuilds;
+
+    public VersionedPluginBuilder() {
+        pluginBuilds = new ArrayList<>();
+    }
+
+    public VersionedPluginBuilder include(VersionedTestPlugin plugin, String 
version) {
+        pluginBuilds.add(new BuildInfo(plugin, version));
+        return this;
+    }
+
+    public synchronized Path build(String pluginDir) throws IOException {
+        Path pluginDirPath = Files.createTempDirectory(pluginDir);

Review Comment:
   This method is leaking files on each run. I found a bunch of files left over 
on my machine:
   ```
   cd /var/folders/k1/6b93_bm16596yf4d3dlp6vbh0000gn/T
   ls
   rm -rf SINK_CONNECTOR-* SOURCE_CONNECTOR-* PREDICATE-* TRANSFORMATION-* 
HEADER_CONVERTER-* CONVERTER-* all_versioned_artifact*
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginRecommenderTest.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+import static 
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.DEFAULT_ISOLATED_ARTIFACTS;
+import static 
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.MULTI_VERSION_PLUGINS;
+
+public class PluginRecommenderTest {
+
+    private Set<String> allVersionsOff(String classOrAlias) {

Review Comment:
   nit: off -> of typo?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to