gharris1727 commented on code in PR #18325: URL: https://github.com/apache/kafka/pull/18325#discussion_r2058956971
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java: ########## @@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir, Path binDir) throws IOExc if (!success) { throw new RuntimeException("Failed to compile test plugin:\n" + writer); } + } finally { + if (!replacements.isEmpty()) { + sourceFiles.forEach(File::delete); + } + } + } + + private static File copyAndReplace(File source, Map<String, String> replacements) throws RuntimeException { + if (replacements.isEmpty()) { + return source; + } + try { + String content = Files.readString(source.toPath()); + for (Map.Entry<String, String> entry : replacements.entrySet()) { + content = content.replace(entry.getKey(), entry.getValue()); + } + File tmpFile = new File(System.getProperty("java.io.tmpdir") + File.separator + source.getName()); Review Comment: nit: use Files.createTempFile? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java: ########## @@ -52,6 +52,7 @@ import javax.tools.StandardJavaFileManager; import javax.tools.ToolProvider; + Review Comment: nit: newline ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class VersionedPluginBuilder { + + private static final String VERSION_PLACEHOLDER = "PLACEHOLDER_FOR_VERSION"; + + public enum VersionedTestPlugin { + + SINK_CONNECTOR("sampling-connector", "test.plugins.VersionedSamplingSinkConnector"), + SOURCE_CONNECTOR("versioned-source-connector", "test.plugins.VersionedSamplingSourceConnector"), + CONVERTER("sampling-converter", "test.plugins.VersionedSamplingConverter"), + HEADER_CONVERTER("sampling-header-converter", "test.plugins.VersionedSamplingHeaderConverter"), + TRANSFORMATION("versioned-transformation", "test.plugins.VersionedTransformation"), + PREDICATE("versioned-predicate", "test.plugins.VersionedPredicate"); + + private final String resourceDir; + private final String className; + + VersionedTestPlugin(String resourceDir, String className) { + this.resourceDir = resourceDir; + this.className = className; + } + + public String resourceDir() { + return resourceDir; + } + + public String className() { + return className; + } + } + + public static class BuildInfo { + + private final VersionedTestPlugin plugin; + private final String version; + private String location; + + private BuildInfo(VersionedTestPlugin plugin, String version) { + this.plugin = plugin; + this.version = version; + } + + private void setLocation(String location) { + this.location = location; + } + + public VersionedTestPlugin plugin() { + return plugin; + } + + public String version() { + return version; + } + + public String location() { + return location; + } + } + + private final List<BuildInfo> pluginBuilds; + + public VersionedPluginBuilder() { + pluginBuilds = new ArrayList<>(); + } + + public VersionedPluginBuilder include(VersionedTestPlugin plugin, String version) { + pluginBuilds.add(new BuildInfo(plugin, version)); + return this; + } + + public synchronized Path build(String pluginDir) throws IOException { + Path pluginDirPath = Files.createTempDirectory(pluginDir); + Path subDir = Files.createDirectory(pluginDirPath.resolve("lib")); + for (BuildInfo buildInfo : pluginBuilds) { Review Comment: Allowing the code to combine multiple test plugins together into a single plugin location is very interesting. Currently the bad-packaging plugin relies on plugins being located together, but this is because the code is laid out as multiple source files inside of a single plugin. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException; +import org.apache.maven.artifact.versioning.VersionRange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + + +public class MultiVersionTest { + + private static Plugins setUpPlugins(Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) { + String pluginPath = artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(",")); + Map<String, String> configs = new HashMap<>(); + configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath); + configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name()); + return new Plugins(configs); + } + + private void assertPluginLoad(Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) + throws InvalidVersionSpecificationException, ClassNotFoundException { + + Plugins plugins = setUpPlugins(artifacts, mode); + + for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry : artifacts.entrySet()) { + String pluginLocation = entry.getKey().toAbsolutePath().toString(); + + for (VersionedPluginBuilder.BuildInfo buildInfo : entry.getValue()) { + ClassLoader pluginLoader = plugins.pluginLoader(buildInfo.plugin().className(), PluginUtils.connectorVersionRequirement(buildInfo.version())); + Assertions.assertInstanceOf(PluginClassLoader.class, pluginLoader); + Assertions.assertTrue(((PluginClassLoader) pluginLoader).location().contains(pluginLocation)); + Object p = plugins.newPlugin(buildInfo.plugin().className(), PluginUtils.connectorVersionRequirement(buildInfo.version())); + Assertions.assertInstanceOf(Versioned.class, p); + Assertions.assertEquals(buildInfo.version(), ((Versioned) p).version()); + } + } + } + + private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{ + PluginType.SINK, PluginType.SOURCE, PluginType.CONVERTER, + PluginType.HEADER_CONVERTER, PluginType.TRANSFORMATION, PluginType.PREDICATE + }; + + private void assertCorrectLatestPluginVersion( + Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts, + PluginDiscoveryMode mode, + String latestVersion + ) { + Plugins plugins = setUpPlugins(artifacts, mode); + List<String> classes = artifacts.values().stream() + .flatMap(List::stream) + .map(VersionedPluginBuilder.BuildInfo::plugin) + .map(VersionedPluginBuilder.VersionedTestPlugin::className) + .distinct() + .toList(); + for (String className : classes) { + String version = plugins.latestVersion(className, ALL_PLUGIN_TYPES); + Assertions.assertEquals(latestVersion, version); + } + } + + private static Map<Path, List<VersionedPluginBuilder.BuildInfo>> buildIsolatedArtifacts( + String[] versions, + VersionedPluginBuilder.VersionedTestPlugin[] pluginTypes + ) throws IOException { + Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new HashMap<>(); + for (String v : versions) { + for (VersionedPluginBuilder.VersionedTestPlugin pluginType: pluginTypes) { + VersionedPluginBuilder builder = new VersionedPluginBuilder(); + builder.include(pluginType, v); + artifacts.put(builder.build(pluginType + "-" + v), builder.buildInfos()); + } + } + return artifacts; + } + + public static final String DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION; + public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> DEFAULT_ISOLATED_ARTIFACTS; + public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> DEFAULT_COMBINED_ARTIFACT; + public static final Plugins MULTI_VERSION_PLUGINS; + public static final Map<VersionedPluginBuilder.VersionedTestPlugin, String> DEFAULT_COMBINED_ARTIFACT_VERSIONS; + + static { + + String[] defaultIsolatedArtifactsVersions = new String[]{"1.1.0", "2.3.0", "4.3.0"}; + try { + DEFAULT_ISOLATED_ARTIFACTS = buildIsolatedArtifacts( + defaultIsolatedArtifactsVersions, VersionedPluginBuilder.VersionedTestPlugin.values() + ); + DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION = "4.3.0"; + DEFAULT_COMBINED_ARTIFACT_VERSIONS = new HashMap<>(); + + VersionedPluginBuilder builder = new VersionedPluginBuilder(); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR, k -> "0.0.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR, k -> "0.1.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER, k -> "0.2.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER, k -> "0.3.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION, k -> "0.4.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE, k -> "0.5.0")); + DEFAULT_COMBINED_ARTIFACT = Collections.singletonMap(builder.build("all_versioned_artifact"), builder.buildInfos()); + + Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new HashMap<>(); + artifacts.putAll(DEFAULT_COMBINED_ARTIFACT); + artifacts.putAll(DEFAULT_ISOLATED_ARTIFACTS); + MULTI_VERSION_PLUGINS = setUpPlugins(artifacts, PluginDiscoveryMode.SERVICE_LOAD); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void TestVersionedPluginLoaded() throws InvalidVersionSpecificationException, ClassNotFoundException { Review Comment: nit: lowercase the first letter of all of the tests. Here and all tests in PluginRecommenderTest. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java: ########## @@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir, Path binDir) throws IOExc if (!success) { throw new RuntimeException("Failed to compile test plugin:\n" + writer); } + } finally { + if (!replacements.isEmpty()) { + sourceFiles.forEach(File::delete); + } + } + } + + private static File copyAndReplace(File source, Map<String, String> replacements) throws RuntimeException { + if (replacements.isEmpty()) { + return source; + } Review Comment: Having the same condition in multiple disconnected places is a common source of bugs. If these two pieces of code are not called together, or someone modifies one condition but not the other, things won't behave correctly. Could the control flow be simplified? It's worth considering whether you need either of these conditions in the code at all. ########## connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.predicates.Predicate; + +import java.util.Map; + +/** + /** + * Predicate to test multiverioning of plugins. + * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with the actual version during plugin compilation. + */ +public class VersionedPredicate<R extends ConnectRecord<R>> implements Predicate<R>, Versioned { Review Comment: nit: We have NonMigratedPredicate already, maybe we need a migrated, but non-versioned predicate? Similar for Transformation. This is already missing test coverage on trunk, so maybe it can be addressed in a follow-up. I think it's also a bug that you fixed in an earlier PR. ########## connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/VersionedSamplingConverter.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Collections; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; + +/** + * Converter to test multiverioning of plugins. + * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with the actual version during plugin compilation. + */ +public class VersionedSamplingConverter extends SamplingConverter implements Versioned { Review Comment: There is definitely value in having a distinct class here, because the Converter interface doesn't extend Versioned. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class VersionedPluginBuilder { + + private static final String VERSION_PLACEHOLDER = "PLACEHOLDER_FOR_VERSION"; + + public enum VersionedTestPlugin { Review Comment: WDYT about rolling these into the TestPlugin enum? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java: ########## @@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir, Path binDir) throws IOExc if (!success) { throw new RuntimeException("Failed to compile test plugin:\n" + writer); } + } finally { + if (!replacements.isEmpty()) { + sourceFiles.forEach(File::delete); + } + } + } + + private static File copyAndReplace(File source, Map<String, String> replacements) throws RuntimeException { + if (replacements.isEmpty()) { + return source; + } + try { + String content = Files.readString(source.toPath()); + for (Map.Entry<String, String> entry : replacements.entrySet()) { + content = content.replace(entry.getKey(), entry.getValue()); Review Comment: nit: I don't really like this "modifying arbitrary source code before compilation" technique, it's very powerful and open-ended, and could be misused and be more difficult to debug like self-modifying code. But because the current application is within reason, maybe this is good enough to merge. Prior art here is the ReadVersionFromResourcePlugin, whose code is duplicated in read-version-from-resource-v1 and read-version-from-resource-v2, with version files are specified explicitly. I think that we shouldn't follow this same strategy for these new plugins and versions, but there's a middle ground where we generate version files from Strings specified in createPluginJar/writeJar. ########## connect/runtime/src/test/resources/test-plugins/sampling-connector/test/plugins/VersionedSamplingSinkConnector.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * VersionedSamplingSinkConnector is a test connector that extends SamplingConnector and overrides the version method. + * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with the actual version during plugin compilation. + */ +public final class VersionedSamplingSinkConnector extends SamplingConnector { Review Comment: Is it significant that this is a SamplingConnector or SamplingPlugin, or is that just for convenience? I see that the SamplingPlugin interface is only used in PluginsTest, which doesn't use this (or the other Versioned) plugins. The SamplingPlugin was meant specifically for the plugins to exfiltrate ClassLoader references to assertions in PluginsTest code, it's not needed if you're not performing those assertions, as you can just extend the connect-api class. Theoretically you could add classloader assertions to the multiversion tests, but that doesn't feel very high priority, as the existing classloader assertions should have pretty good coverage. Or because Connectors are always Versioned, maybe you could add the version substitution to the SamplingConnector itself, rather than extending it. One problem with combining both fixed and dynamic versioned plugins together is that you are obligated to bring an additional copy of the fixed version plugin with each dynamic versioned plugin. For example, if you want a 1.0 and 2.0 VersionedSamplingSinkConnector in the same plugin location, you are forced to have two .class files for the 1.0.0 SamplingConnector. This is not generally a supported condition because of classloader nondeterminism, but because it's all compiled from the same source it probably won't break. Unless the code substitution mechanism causes a binary incompatibility.... TLDR: I think the Versioned plugins should not implement SamplingPlugin, and should be in their own packages like the versioned-predicate and versioned-transformation. This applies to all of the Sampling test plugins. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException; +import org.apache.maven.artifact.versioning.VersionRange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + + +public class MultiVersionTest { + + private static Plugins setUpPlugins(Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) { + String pluginPath = artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(",")); + Map<String, String> configs = new HashMap<>(); + configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath); + configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name()); + return new Plugins(configs); + } + + private void assertPluginLoad(Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) + throws InvalidVersionSpecificationException, ClassNotFoundException { + + Plugins plugins = setUpPlugins(artifacts, mode); + + for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry : artifacts.entrySet()) { + String pluginLocation = entry.getKey().toAbsolutePath().toString(); + + for (VersionedPluginBuilder.BuildInfo buildInfo : entry.getValue()) { + ClassLoader pluginLoader = plugins.pluginLoader(buildInfo.plugin().className(), PluginUtils.connectorVersionRequirement(buildInfo.version())); + Assertions.assertInstanceOf(PluginClassLoader.class, pluginLoader); + Assertions.assertTrue(((PluginClassLoader) pluginLoader).location().contains(pluginLocation)); + Object p = plugins.newPlugin(buildInfo.plugin().className(), PluginUtils.connectorVersionRequirement(buildInfo.version())); + Assertions.assertInstanceOf(Versioned.class, p); + Assertions.assertEquals(buildInfo.version(), ((Versioned) p).version()); + } + } + } + + private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{ Review Comment: nit: The tests still pass if this variable is set to PluginType.values(). Should they fail? Can we eliminate this variable? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java: ########## @@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir, Path binDir) throws IOExc if (!success) { throw new RuntimeException("Failed to compile test plugin:\n" + writer); } + } finally { + if (!replacements.isEmpty()) { + sourceFiles.forEach(File::delete); + } + } + } + + private static File copyAndReplace(File source, Map<String, String> replacements) throws RuntimeException { + if (replacements.isEmpty()) { + return source; + } + try { + String content = Files.readString(source.toPath()); + for (Map.Entry<String, String> entry : replacements.entrySet()) { + content = content.replace(entry.getKey(), entry.getValue()); + } + File tmpFile = new File(System.getProperty("java.io.tmpdir") + File.separator + source.getName()); + Files.writeString(tmpFile.toPath(), content); + return tmpFile; Review Comment: Please also call deleteOnExit() for this file. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException; +import org.apache.maven.artifact.versioning.VersionRange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + + +public class MultiVersionTest { + + private static Plugins setUpPlugins(Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) { + String pluginPath = artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(",")); + Map<String, String> configs = new HashMap<>(); + configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath); + configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name()); + return new Plugins(configs); + } + + private void assertPluginLoad(Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) + throws InvalidVersionSpecificationException, ClassNotFoundException { + + Plugins plugins = setUpPlugins(artifacts, mode); + + for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry : artifacts.entrySet()) { + String pluginLocation = entry.getKey().toAbsolutePath().toString(); + + for (VersionedPluginBuilder.BuildInfo buildInfo : entry.getValue()) { + ClassLoader pluginLoader = plugins.pluginLoader(buildInfo.plugin().className(), PluginUtils.connectorVersionRequirement(buildInfo.version())); + Assertions.assertInstanceOf(PluginClassLoader.class, pluginLoader); + Assertions.assertTrue(((PluginClassLoader) pluginLoader).location().contains(pluginLocation)); + Object p = plugins.newPlugin(buildInfo.plugin().className(), PluginUtils.connectorVersionRequirement(buildInfo.version())); + Assertions.assertInstanceOf(Versioned.class, p); + Assertions.assertEquals(buildInfo.version(), ((Versioned) p).version()); + } + } + } + + private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{ + PluginType.SINK, PluginType.SOURCE, PluginType.CONVERTER, + PluginType.HEADER_CONVERTER, PluginType.TRANSFORMATION, PluginType.PREDICATE + }; + + private void assertCorrectLatestPluginVersion( + Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts, + PluginDiscoveryMode mode, + String latestVersion + ) { + Plugins plugins = setUpPlugins(artifacts, mode); + List<String> classes = artifacts.values().stream() + .flatMap(List::stream) + .map(VersionedPluginBuilder.BuildInfo::plugin) + .map(VersionedPluginBuilder.VersionedTestPlugin::className) + .distinct() + .toList(); + for (String className : classes) { + String version = plugins.latestVersion(className, ALL_PLUGIN_TYPES); + Assertions.assertEquals(latestVersion, version); + } + } + + private static Map<Path, List<VersionedPluginBuilder.BuildInfo>> buildIsolatedArtifacts( + String[] versions, + VersionedPluginBuilder.VersionedTestPlugin[] pluginTypes + ) throws IOException { + Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new HashMap<>(); + for (String v : versions) { + for (VersionedPluginBuilder.VersionedTestPlugin pluginType: pluginTypes) { + VersionedPluginBuilder builder = new VersionedPluginBuilder(); + builder.include(pluginType, v); + artifacts.put(builder.build(pluginType + "-" + v), builder.buildInfos()); + } + } + return artifacts; + } + + public static final String DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION; + public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> DEFAULT_ISOLATED_ARTIFACTS; + public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> DEFAULT_COMBINED_ARTIFACT; + public static final Plugins MULTI_VERSION_PLUGINS; + public static final Map<VersionedPluginBuilder.VersionedTestPlugin, String> DEFAULT_COMBINED_ARTIFACT_VERSIONS; + + static { + + String[] defaultIsolatedArtifactsVersions = new String[]{"1.1.0", "2.3.0", "4.3.0"}; + try { + DEFAULT_ISOLATED_ARTIFACTS = buildIsolatedArtifacts( + defaultIsolatedArtifactsVersions, VersionedPluginBuilder.VersionedTestPlugin.values() + ); + DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION = "4.3.0"; + DEFAULT_COMBINED_ARTIFACT_VERSIONS = new HashMap<>(); + + VersionedPluginBuilder builder = new VersionedPluginBuilder(); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR, k -> "0.0.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR, k -> "0.1.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER, k -> "0.2.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER, k -> "0.3.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION, k -> "0.4.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE, k -> "0.5.0")); + DEFAULT_COMBINED_ARTIFACT = Collections.singletonMap(builder.build("all_versioned_artifact"), builder.buildInfos()); + + Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new HashMap<>(); + artifacts.putAll(DEFAULT_COMBINED_ARTIFACT); + artifacts.putAll(DEFAULT_ISOLATED_ARTIFACTS); + MULTI_VERSION_PLUGINS = setUpPlugins(artifacts, PluginDiscoveryMode.SERVICE_LOAD); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void TestVersionedPluginLoaded() throws InvalidVersionSpecificationException, ClassNotFoundException { + assertPluginLoad(DEFAULT_COMBINED_ARTIFACT, PluginDiscoveryMode.SERVICE_LOAD); + assertPluginLoad(DEFAULT_COMBINED_ARTIFACT, PluginDiscoveryMode.ONLY_SCAN); + } + + @Test + public void TestMultipleIsolatedVersionedPluginLoading() throws InvalidVersionSpecificationException, ClassNotFoundException { + assertPluginLoad(DEFAULT_ISOLATED_ARTIFACTS, PluginDiscoveryMode.SERVICE_LOAD); + assertPluginLoad(DEFAULT_ISOLATED_ARTIFACTS, PluginDiscoveryMode.ONLY_SCAN); + } + + @Test + public void TestLatestVersion() { + assertCorrectLatestPluginVersion(DEFAULT_ISOLATED_ARTIFACTS, PluginDiscoveryMode.SERVICE_LOAD, DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION); + assertCorrectLatestPluginVersion(DEFAULT_ISOLATED_ARTIFACTS, PluginDiscoveryMode.ONLY_SCAN, DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION); + } + + @Test + public void TestBundledPluginLoading() throws InvalidVersionSpecificationException, ClassNotFoundException { + + Plugins plugins = MULTI_VERSION_PLUGINS; + // get the connector loader of the combined artifact which includes all plugin types + ClassLoader connectorLoader = plugins.pluginLoader( + VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className(), + PluginUtils.connectorVersionRequirement("0.1.0") + ); + Assertions.assertInstanceOf(PluginClassLoader.class, connectorLoader); + + List<VersionedPluginBuilder.VersionedTestPlugin> pluginTypes = List.of( + VersionedPluginBuilder.VersionedTestPlugin.CONVERTER, + VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER, + VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION, + VersionedPluginBuilder.VersionedTestPlugin.PREDICATE + ); + // should match the version used in setUp for creating the combined artifact + List<String> versions = Arrays.asList("0.2.0", "0.3.0", "0.4.0", "0.5.0"); Review Comment: Do these versions correspond to the versions specified in the static initializer? Can they be shared constants? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException; +import org.apache.maven.artifact.versioning.VersionRange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + + +public class MultiVersionTest { + + private static Plugins setUpPlugins(Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) { + String pluginPath = artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(",")); + Map<String, String> configs = new HashMap<>(); + configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath); + configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name()); + return new Plugins(configs); + } + + private void assertPluginLoad(Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) + throws InvalidVersionSpecificationException, ClassNotFoundException { + + Plugins plugins = setUpPlugins(artifacts, mode); + + for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry : artifacts.entrySet()) { + String pluginLocation = entry.getKey().toAbsolutePath().toString(); + + for (VersionedPluginBuilder.BuildInfo buildInfo : entry.getValue()) { + ClassLoader pluginLoader = plugins.pluginLoader(buildInfo.plugin().className(), PluginUtils.connectorVersionRequirement(buildInfo.version())); + Assertions.assertInstanceOf(PluginClassLoader.class, pluginLoader); + Assertions.assertTrue(((PluginClassLoader) pluginLoader).location().contains(pluginLocation)); + Object p = plugins.newPlugin(buildInfo.plugin().className(), PluginUtils.connectorVersionRequirement(buildInfo.version())); + Assertions.assertInstanceOf(Versioned.class, p); + Assertions.assertEquals(buildInfo.version(), ((Versioned) p).version()); + } + } + } + + private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{ + PluginType.SINK, PluginType.SOURCE, PluginType.CONVERTER, + PluginType.HEADER_CONVERTER, PluginType.TRANSFORMATION, PluginType.PREDICATE + }; + + private void assertCorrectLatestPluginVersion( + Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts, + PluginDiscoveryMode mode, + String latestVersion + ) { + Plugins plugins = setUpPlugins(artifacts, mode); + List<String> classes = artifacts.values().stream() + .flatMap(List::stream) + .map(VersionedPluginBuilder.BuildInfo::plugin) + .map(VersionedPluginBuilder.VersionedTestPlugin::className) + .distinct() + .toList(); + for (String className : classes) { + String version = plugins.latestVersion(className, ALL_PLUGIN_TYPES); + Assertions.assertEquals(latestVersion, version); + } + } + + private static Map<Path, List<VersionedPluginBuilder.BuildInfo>> buildIsolatedArtifacts( + String[] versions, + VersionedPluginBuilder.VersionedTestPlugin[] pluginTypes + ) throws IOException { + Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new HashMap<>(); + for (String v : versions) { + for (VersionedPluginBuilder.VersionedTestPlugin pluginType: pluginTypes) { + VersionedPluginBuilder builder = new VersionedPluginBuilder(); + builder.include(pluginType, v); + artifacts.put(builder.build(pluginType + "-" + v), builder.buildInfos()); + } + } + return artifacts; + } + + public static final String DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION; + public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> DEFAULT_ISOLATED_ARTIFACTS; + public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>> DEFAULT_COMBINED_ARTIFACT; + public static final Plugins MULTI_VERSION_PLUGINS; + public static final Map<VersionedPluginBuilder.VersionedTestPlugin, String> DEFAULT_COMBINED_ARTIFACT_VERSIONS; + + static { + + String[] defaultIsolatedArtifactsVersions = new String[]{"1.1.0", "2.3.0", "4.3.0"}; + try { + DEFAULT_ISOLATED_ARTIFACTS = buildIsolatedArtifacts( + defaultIsolatedArtifactsVersions, VersionedPluginBuilder.VersionedTestPlugin.values() + ); + DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION = "4.3.0"; + DEFAULT_COMBINED_ARTIFACT_VERSIONS = new HashMap<>(); + + VersionedPluginBuilder builder = new VersionedPluginBuilder(); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR, k -> "0.0.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR, k -> "0.1.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER, k -> "0.2.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER, k -> "0.3.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION, k -> "0.4.0")); + builder.include(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE, + DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE, k -> "0.5.0")); + DEFAULT_COMBINED_ARTIFACT = Collections.singletonMap(builder.build("all_versioned_artifact"), builder.buildInfos()); Review Comment: nit: Why is this computeIfAbsent, when we are constructing the map for the first time? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class VersionedPluginBuilder { + + private static final String VERSION_PLACEHOLDER = "PLACEHOLDER_FOR_VERSION"; + + public enum VersionedTestPlugin { + + SINK_CONNECTOR("sampling-connector", "test.plugins.VersionedSamplingSinkConnector"), + SOURCE_CONNECTOR("versioned-source-connector", "test.plugins.VersionedSamplingSourceConnector"), + CONVERTER("sampling-converter", "test.plugins.VersionedSamplingConverter"), + HEADER_CONVERTER("sampling-header-converter", "test.plugins.VersionedSamplingHeaderConverter"), + TRANSFORMATION("versioned-transformation", "test.plugins.VersionedTransformation"), + PREDICATE("versioned-predicate", "test.plugins.VersionedPredicate"); + + private final String resourceDir; + private final String className; + + VersionedTestPlugin(String resourceDir, String className) { + this.resourceDir = resourceDir; + this.className = className; + } + + public String resourceDir() { + return resourceDir; + } + + public String className() { + return className; + } + } + + public static class BuildInfo { + + private final VersionedTestPlugin plugin; + private final String version; + private String location; + + private BuildInfo(VersionedTestPlugin plugin, String version) { + this.plugin = plugin; + this.version = version; + } + + private void setLocation(String location) { + this.location = location; + } + + public VersionedTestPlugin plugin() { + return plugin; + } + + public String version() { + return version; + } + + public String location() { + return location; + } + } + + private final List<BuildInfo> pluginBuilds; + + public VersionedPluginBuilder() { + pluginBuilds = new ArrayList<>(); + } + + public VersionedPluginBuilder include(VersionedTestPlugin plugin, String version) { + pluginBuilds.add(new BuildInfo(plugin, version)); + return this; + } + + public synchronized Path build(String pluginDir) throws IOException { + Path pluginDirPath = Files.createTempDirectory(pluginDir); Review Comment: This method is leaking files on each run. I found a bunch of files left over on my machine: ``` cd /var/folders/k1/6b93_bm16596yf4d3dlp6vbh0000gn/T ls rm -rf SINK_CONNECTOR-* SOURCE_CONNECTOR-* PREDICATE-* TRANSFORMATION-* HEADER_CONVERTER-* CONVERTER-* all_versioned_artifact* ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginRecommenderTest.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.kafka.connect.runtime.isolation.MultiVersionTest.DEFAULT_COMBINED_ARTIFACT_VERSIONS; +import static org.apache.kafka.connect.runtime.isolation.MultiVersionTest.DEFAULT_ISOLATED_ARTIFACTS; +import static org.apache.kafka.connect.runtime.isolation.MultiVersionTest.MULTI_VERSION_PLUGINS; + +public class PluginRecommenderTest { + + private Set<String> allVersionsOff(String classOrAlias) { Review Comment: nit: off -> of typo? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org