gharris1727 commented on code in PR #16984:
URL: https://github.com/apache/kafka/pull/16984#discussion_r1873720403


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -112,21 +199,70 @@ public void installDiscoveredPlugins(PluginScanResult 
scanResult) {
 
     @Override
     protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+        return loadVersionedPluginClass(name, null, resolve);
+    }
+
+    protected Class<?> loadVersionedPluginClass(
+        String name,
+        VersionRange range,
+        boolean resolve
+    ) throws VersionedPluginLoadingException, ClassNotFoundException {
+
         String fullName = aliases.getOrDefault(name, name);
-        PluginClassLoader pluginLoader = pluginClassLoader(fullName);
+        PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
+        Class<?> plugin;
         if (pluginLoader != null) {
-            log.trace("Retrieving loaded class '{}' from '{}'", fullName, 
pluginLoader);
-            return pluginLoader.loadClass(fullName, resolve);
+            log.trace("Retrieving loaded class '{}' from '{}'", name, 
pluginLoader);
+            plugin = pluginLoader.loadClass(fullName, resolve);
+        } else {
+            plugin = super.loadClass(fullName, resolve);
+            if (range == null) {
+                return plugin;
+            }
+
+            String pluginVersion;
+            SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = 
pluginLoaders.get(fullName);
+
+            if (scannedPlugin == null) {
+                throw new VersionedPluginLoadingException(String.format(
+                        "Plugin %s is not part of Connect's plugin loading 
mechanism (ClassPath or Plugin Path)",
+                        fullName
+                ));
+            }
+
+            List<PluginDesc<?>> classpathPlugins = 
scannedPlugin.keySet().stream()
+                    .filter(pluginDesc -> 
pluginDesc.location().equals("classpath"))
+                    .collect(Collectors.toList());
+
+            if (classpathPlugins.size() > 1) {
+                throw new VersionedPluginLoadingException(String.format(
+                        "Plugin %s has multiple versions specified in class 
path, "
+                                + "only one version is allowed in class path 
for loading a plugin with version range",
+                        fullName
+                ));
+            } else {

Review Comment:
   There's also the case of classpathPlugins.isEmpty(), in case loadClass 
returns a valid class, but it wasn't discovered by scanning, like if the class 
is abstract or not migrated.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -69,36 +75,117 @@ public DelegatingClassLoader() {
 
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
+     *
      * @param name The fully qualified class name of the plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
     // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
+
         SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
         return pluginLoader instanceof PluginClassLoader
-               ? (PluginClassLoader) pluginLoader
-               : null;
+            ? (PluginClassLoader) pluginLoader
+            : null;
     }
 
-    ClassLoader connectorLoader(String connectorClassOrAlias) {
-        String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
-        ClassLoader classLoader = pluginClassLoader(fullName);
-        if (classLoader == null) classLoader = this;
+    PluginClassLoader pluginClassLoader(String name) {
+        return pluginClassLoader(name, null);
+    }
+
+    ClassLoader loader(String classOrAlias, VersionRange range) throws 
ClassNotFoundException {
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        ClassLoader classLoader = range == null ? pluginClassLoader(fullName) :
+                // load the plugin class when version is provided which will 
validate the correct version is loaded
+                loadVersionedPluginClass(fullName, range, 
false).getClassLoader();
+        // the classloader returned can be the classpath loader in which case 
we should return the delegating classloader
+        if (!(classLoader instanceof PluginClassLoader)) {
+            classLoader = this;
+        }
         log.debug(
-            "Getting plugin class loader: '{}' for connector: {}",
-            classLoader,
-            connectorClassOrAlias
+                "Got plugin class loader: '{}' for connector: {}",
+                classLoader,
+                classOrAlias
         );
         return classLoader;
     }
 
+    ClassLoader loader(String classOrAlias) {
+        try {
+            return loader(classOrAlias, null);
+        } catch (ClassNotFoundException e) {
+            // class not found should not happen here as version is not 
provided
+        }
+        return this;
+    }
+
+    ClassLoader connectorLoader(String connectorClassOrAlias) {
+        return loader(connectorClassOrAlias);
+    }
+
+    String resolveFullClassName(String classOrAlias) {
+        return aliases.getOrDefault(classOrAlias, classOrAlias);
+    }
+
+    String latestVersion(String classOrAlias) {
+        if (classOrAlias == null) {
+            return null;
+        }
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        SortedMap<PluginDesc<?>, ClassLoader> inner = 
pluginLoaders.get(fullName);
+        if (inner == null) {
+            return null;
+        }
+        return inner.lastKey().version();
+    }
+
+    private ClassLoader findPluginLoader(
+        SortedMap<PluginDesc<?>, ClassLoader> loaders,
+        String pluginName,
+        VersionRange range
+    ) {
+
+        if (range != null) {
+
+            if (null != range.getRecommendedVersion()) {
+                throw new VersionedPluginLoadingException(String.format("A 
soft version range is not supported for plugin loading, "
+                        + "this is an internal error as connect should 
automatically convert soft ranges to hard ranges. "
+                        + "Provided soft version: %s ", range));
+            }
+
+            ArtifactVersion version = null;
+            ClassLoader loader = null;
+            for (Map.Entry<PluginDesc<?>, ClassLoader> entry : 
loaders.entrySet()) {
+                // the entries should be in sorted order of versions so this 
should end up picking the latest version which matches the range
+                if (range.containsVersion(entry.getKey().encodedVersion()) && 
entry.getValue() instanceof PluginClassLoader) {

Review Comment:
   nit: this `instanceof` check can be removed because it duplicates the check 
in the caller.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -112,21 +199,70 @@ public void installDiscoveredPlugins(PluginScanResult 
scanResult) {
 
     @Override
     protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+        return loadVersionedPluginClass(name, null, resolve);
+    }
+
+    protected Class<?> loadVersionedPluginClass(
+        String name,
+        VersionRange range,
+        boolean resolve
+    ) throws VersionedPluginLoadingException, ClassNotFoundException {
+
         String fullName = aliases.getOrDefault(name, name);
-        PluginClassLoader pluginLoader = pluginClassLoader(fullName);
+        PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
+        Class<?> plugin;
         if (pluginLoader != null) {
-            log.trace("Retrieving loaded class '{}' from '{}'", fullName, 
pluginLoader);
-            return pluginLoader.loadClass(fullName, resolve);
+            log.trace("Retrieving loaded class '{}' from '{}'", name, 
pluginLoader);
+            plugin = pluginLoader.loadClass(fullName, resolve);
+        } else {
+            plugin = super.loadClass(fullName, resolve);
+            if (range == null) {
+                return plugin;
+            }
+
+            String pluginVersion;
+            SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = 
pluginLoaders.get(fullName);
+
+            if (scannedPlugin == null) {
+                throw new VersionedPluginLoadingException(String.format(
+                        "Plugin %s is not part of Connect's plugin loading 
mechanism (ClassPath or Plugin Path)",
+                        fullName
+                ));
+            }
+
+            List<PluginDesc<?>> classpathPlugins = 
scannedPlugin.keySet().stream()
+                    .filter(pluginDesc -> 
pluginDesc.location().equals("classpath"))
+                    .collect(Collectors.toList());
+
+            if (classpathPlugins.size() > 1) {
+                throw new VersionedPluginLoadingException(String.format(
+                        "Plugin %s has multiple versions specified in class 
path, "
+                                + "only one version is allowed in class path 
for loading a plugin with version range",
+                        fullName
+                ));
+            } else {
+                pluginVersion = classpathPlugins.get(0).version();
+                if (!range.containsVersion(new 
DefaultArtifactVersion(pluginVersion))) {
+                    throw new VersionedPluginLoadingException(String.format(
+                            "Plugin %s has version %s which does not match the 
required version range %s",
+                            fullName,
+                            pluginVersion,
+                            range
+                    ), Collections.singletonList(pluginVersion));
+                }
+            }

Review Comment:
   nit: All of the verification steps after the range == null check can be 
moved to a separate method, maybe `ArtifactVersion 
classpathPluginVersion(String)` or `boolean 
classpathPluginVersionInRange(String, VersionRange)`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##########
@@ -222,7 +222,7 @@ protected static String 
reflectiveErrorDescription(Throwable t) {
         }
     }
 
-    protected LoaderSwap withClassLoader(ClassLoader loader) {
+    protected static LoaderSwap withClassLoader(ClassLoader loader) {

Review Comment:
   This can be nonstatic now that DelegatingLoader isn't using it.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -69,36 +75,117 @@ public DelegatingClassLoader() {
 
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
+     *
      * @param name The fully qualified class name of the plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
     // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
+
         SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
         return pluginLoader instanceof PluginClassLoader
-               ? (PluginClassLoader) pluginLoader
-               : null;
+            ? (PluginClassLoader) pluginLoader
+            : null;
     }
 
-    ClassLoader connectorLoader(String connectorClassOrAlias) {
-        String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
-        ClassLoader classLoader = pluginClassLoader(fullName);
-        if (classLoader == null) classLoader = this;
+    PluginClassLoader pluginClassLoader(String name) {
+        return pluginClassLoader(name, null);
+    }
+
+    ClassLoader loader(String classOrAlias, VersionRange range) throws 
ClassNotFoundException {
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        ClassLoader classLoader = range == null ? pluginClassLoader(fullName) :
+                // load the plugin class when version is provided which will 
validate the correct version is loaded
+                loadVersionedPluginClass(fullName, range, 
false).getClassLoader();
+        // the classloader returned can be the classpath loader in which case 
we should return the delegating classloader
+        if (!(classLoader instanceof PluginClassLoader)) {
+            classLoader = this;
+        }
         log.debug(
-            "Getting plugin class loader: '{}' for connector: {}",
-            classLoader,
-            connectorClassOrAlias
+                "Got plugin class loader: '{}' for connector: {}",
+                classLoader,
+                classOrAlias
         );
         return classLoader;
     }
 
+    ClassLoader loader(String classOrAlias) {
+        try {
+            return loader(classOrAlias, null);
+        } catch (ClassNotFoundException e) {
+            // class not found should not happen here as version is not 
provided
+        }
+        return this;
+    }
+
+    ClassLoader connectorLoader(String connectorClassOrAlias) {
+        return loader(connectorClassOrAlias);
+    }
+
+    String resolveFullClassName(String classOrAlias) {
+        return aliases.getOrDefault(classOrAlias, classOrAlias);
+    }
+
+    String latestVersion(String classOrAlias) {
+        if (classOrAlias == null) {
+            return null;
+        }
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        SortedMap<PluginDesc<?>, ClassLoader> inner = 
pluginLoaders.get(fullName);
+        if (inner == null) {
+            return null;
+        }
+        return inner.lastKey().version();
+    }
+
+    private ClassLoader findPluginLoader(
+        SortedMap<PluginDesc<?>, ClassLoader> loaders,
+        String pluginName,
+        VersionRange range
+    ) {
+
+        if (range != null) {
+
+            if (null != range.getRecommendedVersion()) {
+                throw new VersionedPluginLoadingException(String.format("A 
soft version range is not supported for plugin loading, "
+                        + "this is an internal error as connect should 
automatically convert soft ranges to hard ranges. "
+                        + "Provided soft version: %s ", range));
+            }
+
+            ArtifactVersion version = null;
+            ClassLoader loader = null;
+            for (Map.Entry<PluginDesc<?>, ClassLoader> entry : 
loaders.entrySet()) {
+                // the entries should be in sorted order of versions so this 
should end up picking the latest version which matches the range
+                if (range.containsVersion(entry.getKey().encodedVersion()) && 
entry.getValue() instanceof PluginClassLoader) {
+                    version = entry.getKey().encodedVersion();
+                    loader = entry.getValue();
+                }
+            }
+
+            if (version == null) {

Review Comment:
   nit: If you change this to loader == null you an eliminate the `version` 
variable entirely.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginLoadingException.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.List;
+
+public class VersionedPluginLoadingException extends ConfigException {
+
+    private List<String> availableVersions = null;
+
+    public VersionedPluginLoadingException(String message) {
+        super(message);
+    }
+
+    public VersionedPluginLoadingException(String message, List<String> 
availableVersions) {
+        super(message);
+        this.availableVersions = availableVersions;
+    }
+
+    public VersionedPluginLoadingException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public VersionedPluginLoadingException(String message, Throwable cause, 
List<String> availableVersions) {
+        super(message, cause);

Review Comment:
   Hmm this is not a helpful constructor, it doesn't set the `Throwable cause`.
   Since these constructors appear unused, and VersionedPluginLoadingExceptions 
are from invalid constraints and not other operations failing, WDYT about 
removing these constructors?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/PluginVersionUtils.java:
##########
@@ -0,0 +1,30 @@
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+
+public class PluginVersionUtils {
+
+    private static Plugins plugins = null;
+
+    public static void setPlugins(Plugins plugins) {
+        PluginVersionUtils.plugins = plugins;

Review Comment:
   Did this leak in from the other PR?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -427,99 +510,121 @@ public Converter newInternalConverter(boolean isKey, 
String className, Map<Strin
      * If the given configuration defines a {@link HeaderConverter} using the 
named configuration property, return a new configured
      * instance.
      *
-     * @param config             the configuration containing the {@link 
Converter}'s configuration; may not be null
-     * @param classPropertyName  the name of the property that contains the 
name of the {@link Converter} class; may not be null
-     * @param classLoaderUsage   which classloader should be used
+     * @param config             the configuration containing the {@link 
HeaderConverter}'s configuration; may not be null
+     * @param classPropertyName  the name of the property that contains the 
name of the {@link HeaderConverter} class; may not be null
+     * @param classLoaderUsage   the name of the property that contains the 
version of the {@link HeaderConverter} class; may not be null
      * @return the instantiated and configured {@link HeaderConverter}; null 
if the configuration did not define the specified property
      * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
-        Class<? extends HeaderConverter> klass = null;
-        switch (classLoaderUsage) {
-            case CURRENT_CLASSLOADER:
-                if (!config.originals().containsKey(classPropertyName)) {
-                    // This connector configuration does not define the header 
converter via the specified property name
-                    return null;
-                }
-                // Attempt to load first with the current classloader, and 
plugins as a fallback.
-                // Note: we can't use config.getConfiguredInstance because we 
have to remove the property prefixes
-                // before calling config(...)
-                klass = pluginClassFromConfig(config, classPropertyName, 
HeaderConverter.class, scanResult.headerConverters());
-                break;
-            case PLUGINS:
-                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback.
-                // Note that there will always be at least a default header 
converter for the worker
-                String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
-                try {
-                    klass = pluginClass(
-                            delegatingLoader,
-                            converterClassOrAlias,
-                            HeaderConverter.class
-                    );
-                } catch (ClassNotFoundException e) {
-                    throw new ConnectException(
-                            "Failed to find any class that implements 
HeaderConverter and which name matches "
-                                    + converterClassOrAlias
-                                    + ", available header converters are: "
-                                    + 
pluginNames(scanResult.headerConverters())
-                    );
-                }
-        }
-        if (klass == null) {
-            throw new ConnectException("Unable to initialize the 
HeaderConverter specified in '" + classPropertyName + "'");
+        return newHeaderConverter(config, classPropertyName, null, 
classLoaderUsage);
+    }
+
+    /**
+     * If the given configuration defines a {@link HeaderConverter} using the 
named configuration property, return a new configured
+     * instance. If the version is specified, it will always use the plugins 
classloader.
+     *
+     * @param config                the configuration containing the {@link 
HeaderConverter}'s configuration; may not be null
+     * @param classPropertyName     the name of the property that contains the 
name of the {@link HeaderConverter} class; may not be null
+     * @param versionPropertyName   the config for the version for the header 
converter
+     * @return the instantiated and configured {@link HeaderConverter}; null 
if the configuration did not define the specified property
+     * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
+     */
+    public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;
+        return newHeaderConverter(config, classPropertyName, 
versionPropertyName, classLoader);
+    }
+
+    private HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName) && 
classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+            // This configuration does not define the Header Converter via the 
specified property name
+            return null;
         }
+        HeaderConverter plugin = newVersionedPlugin(config, classPropertyName, 
versionPropertyName,
+                HeaderConverter.class, classLoaderUsage, 
scanResult.headerConverters());
 
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
         converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
         log.debug("Configuring the header converter with configuration 
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
 
-        HeaderConverter plugin;
-        try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
-            plugin = newPlugin(klass);
+        try (LoaderSwap loaderSwap = 
withClassLoader(plugin.getClass().getClassLoader())) {
             plugin.configure(converterConfig);
         }
         return plugin;
     }
 
-    public ConfigProvider newConfigProvider(AbstractConfig config, String 
providerPrefix, ClassLoaderUsage classLoaderUsage) {
-        String classPropertyName = providerPrefix + ".class";
-        Map<String, String> originalConfig = config.originalsStrings();
-        if (!originalConfig.containsKey(classPropertyName)) {
-            // This configuration does not define the config provider via the 
specified property name
-            return null;
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <U> U newVersionedPlugin(
+            AbstractConfig config,
+            String classPropertyName,
+            String versionPropertyName,
+            Class basePluginClass,
+            ClassLoaderUsage classLoaderUsage,
+            SortedSet<PluginDesc<U>> availablePlugins
+    ) {
+
+        String version = versionPropertyName == null ? null : 
config.getString(versionPropertyName);
+        VersionRange range = null;
+        if (version != null) {
+            try {
+                range = 
PluginVersionUtils.connectorVersionRequirement(version);
+            } catch (InvalidVersionSpecificationException e) {
+                throw new ConnectException(String.format("Invalid version 
range for %s: %s %s", classPropertyName, version, e));

Review Comment:
   nit: move the exception to the `cause` argument.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -112,21 +199,70 @@ public void installDiscoveredPlugins(PluginScanResult 
scanResult) {
 
     @Override
     protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+        return loadVersionedPluginClass(name, null, resolve);
+    }
+
+    protected Class<?> loadVersionedPluginClass(
+        String name,
+        VersionRange range,
+        boolean resolve
+    ) throws VersionedPluginLoadingException, ClassNotFoundException {
+
         String fullName = aliases.getOrDefault(name, name);
-        PluginClassLoader pluginLoader = pluginClassLoader(fullName);
+        PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
+        Class<?> plugin;
         if (pluginLoader != null) {
-            log.trace("Retrieving loaded class '{}' from '{}'", fullName, 
pluginLoader);
-            return pluginLoader.loadClass(fullName, resolve);
+            log.trace("Retrieving loaded class '{}' from '{}'", name, 
pluginLoader);
+            plugin = pluginLoader.loadClass(fullName, resolve);
+        } else {
+            plugin = super.loadClass(fullName, resolve);
+            if (range == null) {
+                return plugin;
+            }
+
+            String pluginVersion;
+            SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = 
pluginLoaders.get(fullName);
+
+            if (scannedPlugin == null) {
+                throw new VersionedPluginLoadingException(String.format(
+                        "Plugin %s is not part of Connect's plugin loading 
mechanism (ClassPath or Plugin Path)",
+                        fullName
+                ));
+            }
+
+            List<PluginDesc<?>> classpathPlugins = 
scannedPlugin.keySet().stream()
+                    .filter(pluginDesc -> 
pluginDesc.location().equals("classpath"))
+                    .collect(Collectors.toList());
+
+            if (classpathPlugins.size() > 1) {
+                throw new VersionedPluginLoadingException(String.format(
+                        "Plugin %s has multiple versions specified in class 
path, "
+                                + "only one version is allowed in class path 
for loading a plugin with version range",
+                        fullName
+                ));
+            } else {
+                pluginVersion = classpathPlugins.get(0).version();
+                if (!range.containsVersion(new 
DefaultArtifactVersion(pluginVersion))) {
+                    throw new VersionedPluginLoadingException(String.format(
+                            "Plugin %s has version %s which does not match the 
required version range %s",
+                            fullName,
+                            pluginVersion,
+                            range
+                    ), Collections.singletonList(pluginVersion));
+                }
+            }
         }
 
-        return super.loadClass(fullName, resolve);
+        return plugin;
     }
 
+
+

Review Comment:
   nit: extra lines



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -427,99 +503,121 @@ public Converter newInternalConverter(boolean isKey, 
String className, Map<Strin
      * If the given configuration defines a {@link HeaderConverter} using the 
named configuration property, return a new configured
      * instance.
      *
-     * @param config             the configuration containing the {@link 
Converter}'s configuration; may not be null
-     * @param classPropertyName  the name of the property that contains the 
name of the {@link Converter} class; may not be null
-     * @param classLoaderUsage   which classloader should be used
+     * @param config             the configuration containing the {@link 
HeaderConverter}'s configuration; may not be null
+     * @param classPropertyName  the name of the property that contains the 
name of the {@link HeaderConverter} class; may not be null
+     * @param classLoaderUsage   the name of the property that contains the 
version of the {@link HeaderConverter} class; may not be null
      * @return the instantiated and configured {@link HeaderConverter}; null 
if the configuration did not define the specified property
      * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
-        Class<? extends HeaderConverter> klass = null;
-        switch (classLoaderUsage) {
-            case CURRENT_CLASSLOADER:
-                if (!config.originals().containsKey(classPropertyName)) {
-                    // This connector configuration does not define the header 
converter via the specified property name
-                    return null;
-                }
-                // Attempt to load first with the current classloader, and 
plugins as a fallback.
-                // Note: we can't use config.getConfiguredInstance because we 
have to remove the property prefixes
-                // before calling config(...)
-                klass = pluginClassFromConfig(config, classPropertyName, 
HeaderConverter.class, scanResult.headerConverters());
-                break;
-            case PLUGINS:
-                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback.
-                // Note that there will always be at least a default header 
converter for the worker
-                String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
-                try {
-                    klass = pluginClass(
-                            delegatingLoader,
-                            converterClassOrAlias,
-                            HeaderConverter.class
-                    );
-                } catch (ClassNotFoundException e) {
-                    throw new ConnectException(
-                            "Failed to find any class that implements 
HeaderConverter and which name matches "
-                                    + converterClassOrAlias
-                                    + ", available header converters are: "
-                                    + 
pluginNames(scanResult.headerConverters())
-                    );
-                }
-        }
-        if (klass == null) {
-            throw new ConnectException("Unable to initialize the 
HeaderConverter specified in '" + classPropertyName + "'");
+        return newHeaderConverter(config, classPropertyName, null, 
classLoaderUsage);
+    }
+
+    /**
+     * If the given configuration defines a {@link HeaderConverter} using the 
named configuration property, return a new configured
+     * instance. If the version is specified, it will always use the plugins 
classloader.
+     *
+     * @param config                the configuration containing the {@link 
HeaderConverter}'s configuration; may not be null
+     * @param classPropertyName     the name of the property that contains the 
name of the {@link HeaderConverter} class; may not be null
+     * @param versionPropertyName   the config for the version for the header 
converter
+     * @return the instantiated and configured {@link HeaderConverter}; null 
if the configuration did not define the specified property
+     * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
+     */
+    public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;
+        return newHeaderConverter(config, classPropertyName, 
versionPropertyName, classLoader);
+    }
+
+    private HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName) && 
classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+            // This configuration does not define the Header Converter via the 
specified property name
+            return null;
         }
+        HeaderConverter plugin = newVersionedPlugin(config, classPropertyName, 
versionPropertyName,
+                HeaderConverter.class, classLoaderUsage, 
scanResult.headerConverters());
 
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
         converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
         log.debug("Configuring the header converter with configuration 
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
 
-        HeaderConverter plugin;
-        try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
-            plugin = newPlugin(klass);
+        try (LoaderSwap loaderSwap = 
withClassLoader(plugin.getClass().getClassLoader())) {
             plugin.configure(converterConfig);
         }
         return plugin;
     }
 
-    public ConfigProvider newConfigProvider(AbstractConfig config, String 
providerPrefix, ClassLoaderUsage classLoaderUsage) {
-        String classPropertyName = providerPrefix + ".class";
-        Map<String, String> originalConfig = config.originalsStrings();
-        if (!originalConfig.containsKey(classPropertyName)) {
-            // This configuration does not define the config provider via the 
specified property name
-            return null;
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <U> U newVersionedPlugin(
+            AbstractConfig config,
+            String classPropertyName,
+            String versionPropertyName,
+            Class basePluginClass,
+            ClassLoaderUsage classLoaderUsage,
+            SortedSet<PluginDesc<U>> availablePlugins
+    ) {
+
+        String version = versionPropertyName == null ? null : 
config.getString(versionPropertyName);
+        VersionRange range = null;
+        if (version != null) {
+            try {
+                range = 
PluginVersionUtils.connectorVersionRequirement(version);
+            } catch (InvalidVersionSpecificationException e) {
+                throw new ConnectException(String.format("Invalid version 
range for %s: %s %s", classPropertyName, version, e));
+            }
         }
-        Class<? extends ConfigProvider> klass = null;
+
+        assert range == null || classLoaderUsage == ClassLoaderUsage.PLUGINS;
+
+        Class<? extends U> klass = null;
+        String basePluginClassName = basePluginClass.getSimpleName();
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
                 // Attempt to load first with the current classloader, and 
plugins as a fallback.
-                klass = pluginClassFromConfig(config, classPropertyName, 
ConfigProvider.class, scanResult.configProviders());
+                // Note: we can't use config.getConfiguredInstance because 
Converter doesn't implement Configurable, and even if it did
+                // we have to remove the property prefixes before calling 
config(...) and we still always want to call Converter.config.
+                klass = pluginClassFromConfig(config, classPropertyName, 
basePluginClass, availablePlugins);
                 break;
             case PLUGINS:
                 // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
-                String configProviderClassOrAlias = 
originalConfig.get(classPropertyName);
+                String classOrAlias = 
config.getClass(classPropertyName).getName();
                 try {
-                    klass = pluginClass(delegatingLoader, 
configProviderClassOrAlias, ConfigProvider.class);
+                    klass = pluginClass(delegatingLoader, classOrAlias, 
basePluginClass, range);
                 } catch (ClassNotFoundException e) {
                     throw new ConnectException(
-                            "Failed to find any class that implements 
ConfigProvider and which name matches "
-                                    + configProviderClassOrAlias + ", 
available ConfigProviders are: "
-                                    + pluginNames(scanResult.configProviders())
+                            "Failed to find any class that implements " + 
basePluginClassName + " and which name matches "
+                                    + classOrAlias + ", available plugins are: 
"
+                                    + pluginNames(availablePlugins)
                     );
                 }
                 break;
         }
         if (klass == null) {
-            throw new ConnectException("Unable to initialize the 
ConfigProvider specified in '" + classPropertyName + "'");
+            throw new ConnectException("Unable to initialize the '" + 
basePluginClassName
+                    + "' specified in '" + classPropertyName + "'");

Review Comment:
   nit: no single quotes around basePluginClassName.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -427,99 +503,121 @@ public Converter newInternalConverter(boolean isKey, 
String className, Map<Strin
      * If the given configuration defines a {@link HeaderConverter} using the 
named configuration property, return a new configured
      * instance.
      *
-     * @param config             the configuration containing the {@link 
Converter}'s configuration; may not be null
-     * @param classPropertyName  the name of the property that contains the 
name of the {@link Converter} class; may not be null
-     * @param classLoaderUsage   which classloader should be used
+     * @param config             the configuration containing the {@link 
HeaderConverter}'s configuration; may not be null
+     * @param classPropertyName  the name of the property that contains the 
name of the {@link HeaderConverter} class; may not be null
+     * @param classLoaderUsage   the name of the property that contains the 
version of the {@link HeaderConverter} class; may not be null
      * @return the instantiated and configured {@link HeaderConverter}; null 
if the configuration did not define the specified property
      * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
-        Class<? extends HeaderConverter> klass = null;
-        switch (classLoaderUsage) {
-            case CURRENT_CLASSLOADER:
-                if (!config.originals().containsKey(classPropertyName)) {
-                    // This connector configuration does not define the header 
converter via the specified property name
-                    return null;
-                }
-                // Attempt to load first with the current classloader, and 
plugins as a fallback.
-                // Note: we can't use config.getConfiguredInstance because we 
have to remove the property prefixes
-                // before calling config(...)
-                klass = pluginClassFromConfig(config, classPropertyName, 
HeaderConverter.class, scanResult.headerConverters());
-                break;
-            case PLUGINS:
-                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback.
-                // Note that there will always be at least a default header 
converter for the worker
-                String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
-                try {
-                    klass = pluginClass(
-                            delegatingLoader,
-                            converterClassOrAlias,
-                            HeaderConverter.class
-                    );
-                } catch (ClassNotFoundException e) {
-                    throw new ConnectException(
-                            "Failed to find any class that implements 
HeaderConverter and which name matches "
-                                    + converterClassOrAlias
-                                    + ", available header converters are: "
-                                    + 
pluginNames(scanResult.headerConverters())
-                    );
-                }
-        }
-        if (klass == null) {
-            throw new ConnectException("Unable to initialize the 
HeaderConverter specified in '" + classPropertyName + "'");
+        return newHeaderConverter(config, classPropertyName, null, 
classLoaderUsage);
+    }
+
+    /**
+     * If the given configuration defines a {@link HeaderConverter} using the 
named configuration property, return a new configured
+     * instance. If the version is specified, it will always use the plugins 
classloader.
+     *
+     * @param config                the configuration containing the {@link 
HeaderConverter}'s configuration; may not be null
+     * @param classPropertyName     the name of the property that contains the 
name of the {@link HeaderConverter} class; may not be null
+     * @param versionPropertyName   the config for the version for the header 
converter
+     * @return the instantiated and configured {@link HeaderConverter}; null 
if the configuration did not define the specified property
+     * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
+     */
+    public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;
+        return newHeaderConverter(config, classPropertyName, 
versionPropertyName, classLoader);
+    }
+
+    private HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName) && 
classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+            // This configuration does not define the Header Converter via the 
specified property name
+            return null;
         }
+        HeaderConverter plugin = newVersionedPlugin(config, classPropertyName, 
versionPropertyName,
+                HeaderConverter.class, classLoaderUsage, 
scanResult.headerConverters());
 
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
         converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
         log.debug("Configuring the header converter with configuration 
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
 
-        HeaderConverter plugin;
-        try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
-            plugin = newPlugin(klass);
+        try (LoaderSwap loaderSwap = 
withClassLoader(plugin.getClass().getClassLoader())) {
             plugin.configure(converterConfig);
         }
         return plugin;
     }
 
-    public ConfigProvider newConfigProvider(AbstractConfig config, String 
providerPrefix, ClassLoaderUsage classLoaderUsage) {
-        String classPropertyName = providerPrefix + ".class";
-        Map<String, String> originalConfig = config.originalsStrings();
-        if (!originalConfig.containsKey(classPropertyName)) {
-            // This configuration does not define the config provider via the 
specified property name
-            return null;
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <U> U newVersionedPlugin(
+            AbstractConfig config,
+            String classPropertyName,
+            String versionPropertyName,
+            Class basePluginClass,
+            ClassLoaderUsage classLoaderUsage,
+            SortedSet<PluginDesc<U>> availablePlugins
+    ) {
+
+        String version = versionPropertyName == null ? null : 
config.getString(versionPropertyName);
+        VersionRange range = null;
+        if (version != null) {
+            try {
+                range = 
PluginVersionUtils.connectorVersionRequirement(version);
+            } catch (InvalidVersionSpecificationException e) {
+                throw new ConnectException(String.format("Invalid version 
range for %s: %s %s", classPropertyName, version, e));
+            }
         }
-        Class<? extends ConfigProvider> klass = null;
+
+        assert range == null || classLoaderUsage == ClassLoaderUsage.PLUGINS;
+
+        Class<? extends U> klass = null;
+        String basePluginClassName = basePluginClass.getSimpleName();
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
                 // Attempt to load first with the current classloader, and 
plugins as a fallback.
-                klass = pluginClassFromConfig(config, classPropertyName, 
ConfigProvider.class, scanResult.configProviders());
+                // Note: we can't use config.getConfiguredInstance because 
Converter doesn't implement Configurable, and even if it did
+                // we have to remove the property prefixes before calling 
config(...) and we still always want to call Converter.config.

Review Comment:
   nit: stale comment



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -69,36 +75,117 @@ public DelegatingClassLoader() {
 
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
+     *
      * @param name The fully qualified class name of the plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
     // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
+
         SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
         return pluginLoader instanceof PluginClassLoader
-               ? (PluginClassLoader) pluginLoader
-               : null;
+            ? (PluginClassLoader) pluginLoader
+            : null;
     }
 
-    ClassLoader connectorLoader(String connectorClassOrAlias) {
-        String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
-        ClassLoader classLoader = pluginClassLoader(fullName);
-        if (classLoader == null) classLoader = this;
+    PluginClassLoader pluginClassLoader(String name) {
+        return pluginClassLoader(name, null);
+    }
+
+    ClassLoader loader(String classOrAlias, VersionRange range) throws 
ClassNotFoundException {
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        ClassLoader classLoader = range == null ? pluginClassLoader(fullName) :
+                // load the plugin class when version is provided which will 
validate the correct version is loaded
+                loadVersionedPluginClass(fullName, range, 
false).getClassLoader();

Review Comment:
   This logic is incorrect, and I think you can just call 
`pluginClassLoader(fullName, range)` to let the null check internally handle 
both cases.
   
   This will also remove the ClassNotFoundException and the need to catch it 
higher up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to