gharris1727 commented on code in PR #16984:
URL: https://github.com/apache/kafka/pull/16984#discussion_r1868472074


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -69,36 +70,109 @@ public DelegatingClassLoader() {
 
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
+     *
      * @param name The fully qualified class name of the plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
     // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
+
         SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
         return pluginLoader instanceof PluginClassLoader
-               ? (PluginClassLoader) pluginLoader
-               : null;
+            ? (PluginClassLoader) pluginLoader
+            : null;
+    }
+
+    PluginClassLoader pluginClassLoader(String name) {
+        return pluginClassLoader(name, null);
+    }
+
+    ClassLoader connectorLoader(String connectorClassOrAlias, VersionRange 
range) throws ClassNotFoundException {
+        String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
+        // if the plugin is not loaded via the plugin classloader, it might 
still be available in the parent delegating
+        // classloader, in order to check if the version satisfies the 
requirement we need to load the plugin class here
+        ClassLoader classLoader = loadVersionedPluginClass(fullName, range, 
false).getClassLoader();
+        log.debug(
+                "Got plugin class loader: '{}' for connector: {}",
+                classLoader,
+                connectorClassOrAlias
+        );
+        return classLoader;
     }
 
     ClassLoader connectorLoader(String connectorClassOrAlias) {
         String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
         ClassLoader classLoader = pluginClassLoader(fullName);
         if (classLoader == null) classLoader = this;
         log.debug(
-            "Getting plugin class loader: '{}' for connector: {}",
-            classLoader,
-            connectorClassOrAlias
+                "Getting plugin class loader: '{}' for connector: {}",
+                classLoader,
+                connectorClassOrAlias
         );
         return classLoader;
     }
 
+    String resolveFullClassName(String classOrAlias) {
+        return aliases.getOrDefault(classOrAlias, classOrAlias);
+    }
+
+    String latestVersion(String classOrAlias) {
+        if (classOrAlias == null) {
+            return null;
+        }
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        SortedMap<PluginDesc<?>, ClassLoader> inner = 
pluginLoaders.get(fullName);
+        if (inner == null) {
+            return null;
+        }
+        return inner.lastKey().version();
+    }
+
+    private ClassLoader findPluginLoader(
+        SortedMap<PluginDesc<?>, ClassLoader> loaders,
+        String pluginName,
+        VersionRange range
+    ) {
+
+        if (range != null) {
+
+            ArtifactVersion version = range.getRecommendedVersion();
+
+            if (range.hasRestrictions()) {
+                List<ArtifactVersion> versions = 
loaders.keySet().stream().map(PluginDesc::encodedVersion).collect(Collectors.toList());
+                version = range.matchVersion(versions);
+                if (version == null) {
+                    List<String> availableVersions = 
loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
+                    throw new VersionedPluginLoadingException(String.format(
+                        "Plugin loader for %s not found that matches the 
version range %s, available versions: %s",
+                        pluginName,
+                        range,
+                        availableVersions
+                    ), availableVersions);
+                }
+            }
+
+            if (version != null) {
+                for (Map.Entry<PluginDesc<?>, ClassLoader> entry : 
loaders.entrySet()) {

Review Comment:
   Since this iterates forwards, when there are two PluginDesc objects with the 
same version, it will prefer the earlier one, which could be the classpath 
copy. We should prefer the later isolated one instead



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -69,36 +70,109 @@ public DelegatingClassLoader() {
 
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
+     *
      * @param name The fully qualified class name of the plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
     // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
+
         SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
         return pluginLoader instanceof PluginClassLoader
-               ? (PluginClassLoader) pluginLoader
-               : null;
+            ? (PluginClassLoader) pluginLoader
+            : null;
+    }
+
+    PluginClassLoader pluginClassLoader(String name) {
+        return pluginClassLoader(name, null);
+    }
+
+    ClassLoader connectorLoader(String connectorClassOrAlias, VersionRange 
range) throws ClassNotFoundException {
+        String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
+        // if the plugin is not loaded via the plugin classloader, it might 
still be available in the parent delegating
+        // classloader, in order to check if the version satisfies the 
requirement we need to load the plugin class here
+        ClassLoader classLoader = loadVersionedPluginClass(fullName, range, 
false).getClassLoader();

Review Comment:
   There's a slight difference here: If the plugin is non-isolated, 
`DelegatingClassLoader#connectorLoader(String)` returns `this` (the delegating 
loader), but `DelegatingClassLoader#connectorLoader(String, VersionRange)` 
returns `parent` (the classpath).
   
   For a similar reason, `connectorLoader(String)` doesn't throw 
`ClassNotFoundException`, it returns `this`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -122,11 +196,54 @@ protected Class<?> loadClass(String name, boolean 
resolve) throws ClassNotFoundE
         return super.loadClass(fullName, resolve);
     }
 
+    protected Class<?> loadVersionedPluginClass(
+        String name,
+        VersionRange range,
+        boolean resolve
+    ) throws VersionedPluginLoadingException, ClassNotFoundException {
+
+        String fullName = aliases.getOrDefault(name, name);
+        PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
+        Class<?> plugin;
+        if (pluginLoader != null) {
+            log.trace("Retrieving loaded class '{}' from '{}'", name, 
pluginLoader);
+            plugin = pluginLoader.loadClass(fullName, resolve);
+        } else {
+            plugin = super.loadClass(fullName, resolve);
+            // if we are loading a plugin class from the parent classloader, 
we need to check if the version
+            // matches the range
+            String pluginVersion;
+            try (LoaderSwap classLoader = 
PluginScanner.withClassLoader(plugin.getClassLoader())) {
+                pluginVersion = 
PluginScanner.versionFor(plugin.getDeclaredConstructor().newInstance());
+            } catch (ReflectiveOperationException | LinkageError e) {
+                throw new VersionedPluginLoadingException(String.format(
+                        "Plugin %s was loaded with %s but failed to determine 
its version",
+                        name,
+                        plugin.getClassLoader()
+                ), e);
+            }

Review Comment:
   This information should be computed ahead-of-time by the PluginScanner, not 
re-computed at runtime each time the class is requested. The parent loader of 
the DelegatingClassLoader will always be considered "classpath" in PluginDesc<> 
objects, which also include the version.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -69,36 +70,109 @@ public DelegatingClassLoader() {
 
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
+     *
      * @param name The fully qualified class name of the plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
     // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
+
         SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
         return pluginLoader instanceof PluginClassLoader
-               ? (PluginClassLoader) pluginLoader
-               : null;
+            ? (PluginClassLoader) pluginLoader
+            : null;
+    }
+
+    PluginClassLoader pluginClassLoader(String name) {
+        return pluginClassLoader(name, null);
+    }
+
+    ClassLoader connectorLoader(String connectorClassOrAlias, VersionRange 
range) throws ClassNotFoundException {
+        String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
+        // if the plugin is not loaded via the plugin classloader, it might 
still be available in the parent delegating
+        // classloader, in order to check if the version satisfies the 
requirement we need to load the plugin class here
+        ClassLoader classLoader = loadVersionedPluginClass(fullName, range, 
false).getClassLoader();
+        log.debug(
+                "Got plugin class loader: '{}' for connector: {}",
+                classLoader,
+                connectorClassOrAlias
+        );
+        return classLoader;
     }
 
     ClassLoader connectorLoader(String connectorClassOrAlias) {
         String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
         ClassLoader classLoader = pluginClassLoader(fullName);
         if (classLoader == null) classLoader = this;
         log.debug(
-            "Getting plugin class loader: '{}' for connector: {}",
-            classLoader,
-            connectorClassOrAlias
+                "Getting plugin class loader: '{}' for connector: {}",
+                classLoader,
+                connectorClassOrAlias
         );
         return classLoader;
     }
 
+    String resolveFullClassName(String classOrAlias) {
+        return aliases.getOrDefault(classOrAlias, classOrAlias);
+    }
+
+    String latestVersion(String classOrAlias) {
+        if (classOrAlias == null) {
+            return null;
+        }
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        SortedMap<PluginDesc<?>, ClassLoader> inner = 
pluginLoaders.get(fullName);
+        if (inner == null) {
+            return null;
+        }
+        return inner.lastKey().version();
+    }
+
+    private ClassLoader findPluginLoader(
+        SortedMap<PluginDesc<?>, ClassLoader> loaders,
+        String pluginName,
+        VersionRange range
+    ) {
+
+        if (range != null) {
+
+            ArtifactVersion version = range.getRecommendedVersion();

Review Comment:
   Since we're not implementing soft requirements, i think 
getRecommendedVersion should always return null, and hasRestrictions always 
returns true. Rather than implementing this case which we don't expect to be 
used, can we avoid calling range.getRecommendedVersion entirely?
   
   If a soft requirement somehow gets passed in here, it could either be a 
no-op (behaving like null) or throw an exception, up to you.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -259,48 +299,91 @@ public Set<PluginDesc<SinkConnector>> sinkConnectors() {
         return scanResult.sinkConnectors();
     }
 
+    public Set<PluginDesc<SinkConnector>> sinkConnectors(String 
connectorClassOrAlias) {
+        return pluginsOfClass(connectorClassOrAlias, 
scanResult.sinkConnectors());
+    }
+
     public Set<PluginDesc<SourceConnector>> sourceConnectors() {
         return scanResult.sourceConnectors();
     }
 
+    public Set<PluginDesc<SourceConnector>> sourceConnectors(String 
connectorClassOrAlias) {
+        return pluginsOfClass(connectorClassOrAlias, 
scanResult.sourceConnectors());
+    }
+
     public Set<PluginDesc<Converter>> converters() {
         return scanResult.converters();
     }
 
+    public Set<PluginDesc<Converter>> converters(String converterClassOrAlias) 
{
+        return pluginsOfClass(converterClassOrAlias, scanResult.converters());
+    }
+
     public Set<PluginDesc<HeaderConverter>> headerConverters() {
         return scanResult.headerConverters();
     }
 
+    public Set<PluginDesc<HeaderConverter>> headerConverters(String 
headerConverterClassOrAlias) {
+        return pluginsOfClass(headerConverterClassOrAlias, 
scanResult.headerConverters());
+    }
+
     public Set<PluginDesc<Transformation<?>>> transformations() {
         return scanResult.transformations();
     }
 
+    public Set<PluginDesc<Transformation<?>>> transformations(String 
transformationClassOrAlias) {
+        return pluginsOfClass(transformationClassOrAlias, 
scanResult.transformations());
+    }
+
     public Set<PluginDesc<Predicate<?>>> predicates() {
         return scanResult.predicates();
     }
 
+    public Set<PluginDesc<Predicate<?>>> predicates(String 
predicateClassOrAlias) {
+        return pluginsOfClass(predicateClassOrAlias, scanResult.predicates());
+    }
+
     public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies() {
         return scanResult.connectorClientConfigPolicies();
     }
 
+    private <T> Set<PluginDesc<T>> pluginsOfClass(String classNameOrAlias, 
Set<PluginDesc<T>> allPluginsOfType) {

Review Comment:
   This (and the methods calling it) are a lot of additional API surface area 
for the Plugins class, which is already quite large. What benefits does this 
"lookup versions of plugins by alias" functionality provide to compensate for 
the extra surface area?
   
   1. Populate a map with null values in `AbstractHerder#getConnector`
   2. Generate a list of valid versions in Recommender instances (e.g. 
ConnectorPluginVersionRecommender)
   
   I think (1) is very low value, and could be avoided with a null/contains 
check elsewhere, or could be performed for all plugins at once rather than 
lazily per-alias.
   (2) is a valid use-case, but could better be located within a common 
PluginVersionRecommender class, and utilize the existing Plugins API surface.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -69,36 +70,109 @@ public DelegatingClassLoader() {
 
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
+     *
      * @param name The fully qualified class name of the plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
     // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
+
         SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
         return pluginLoader instanceof PluginClassLoader
-               ? (PluginClassLoader) pluginLoader
-               : null;
+            ? (PluginClassLoader) pluginLoader
+            : null;
+    }
+
+    PluginClassLoader pluginClassLoader(String name) {
+        return pluginClassLoader(name, null);
+    }
+
+    ClassLoader connectorLoader(String connectorClassOrAlias, VersionRange 
range) throws ClassNotFoundException {
+        String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
+        // if the plugin is not loaded via the plugin classloader, it might 
still be available in the parent delegating
+        // classloader, in order to check if the version satisfies the 
requirement we need to load the plugin class here
+        ClassLoader classLoader = loadVersionedPluginClass(fullName, range, 
false).getClassLoader();
+        log.debug(
+                "Got plugin class loader: '{}' for connector: {}",
+                classLoader,
+                connectorClassOrAlias
+        );
+        return classLoader;
     }
 
     ClassLoader connectorLoader(String connectorClassOrAlias) {
         String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
         ClassLoader classLoader = pluginClassLoader(fullName);
         if (classLoader == null) classLoader = this;
         log.debug(
-            "Getting plugin class loader: '{}' for connector: {}",
-            classLoader,
-            connectorClassOrAlias
+                "Getting plugin class loader: '{}' for connector: {}",
+                classLoader,
+                connectorClassOrAlias
         );
         return classLoader;
     }
 
+    String resolveFullClassName(String classOrAlias) {
+        return aliases.getOrDefault(classOrAlias, classOrAlias);
+    }
+
+    String latestVersion(String classOrAlias) {
+        if (classOrAlias == null) {
+            return null;
+        }
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        SortedMap<PluginDesc<?>, ClassLoader> inner = 
pluginLoaders.get(fullName);
+        if (inner == null) {
+            return null;
+        }
+        return inner.lastKey().version();
+    }
+
+    private ClassLoader findPluginLoader(
+        SortedMap<PluginDesc<?>, ClassLoader> loaders,
+        String pluginName,
+        VersionRange range
+    ) {
+
+        if (range != null) {
+
+            ArtifactVersion version = range.getRecommendedVersion();
+
+            if (range.hasRestrictions()) {
+                List<ArtifactVersion> versions = 
loaders.keySet().stream().map(PluginDesc::encodedVersion).collect(Collectors.toList());
+                version = range.matchVersion(versions);

Review Comment:
   nit: you can avoid iterating over the loaders twice if you use 
VersionRange#containsVersion instead.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -259,48 +299,91 @@ public Set<PluginDesc<SinkConnector>> sinkConnectors() {
         return scanResult.sinkConnectors();
     }
 
+    public Set<PluginDesc<SinkConnector>> sinkConnectors(String 
connectorClassOrAlias) {
+        return pluginsOfClass(connectorClassOrAlias, 
scanResult.sinkConnectors());
+    }
+
     public Set<PluginDesc<SourceConnector>> sourceConnectors() {
         return scanResult.sourceConnectors();
     }
 
+    public Set<PluginDesc<SourceConnector>> sourceConnectors(String 
connectorClassOrAlias) {
+        return pluginsOfClass(connectorClassOrAlias, 
scanResult.sourceConnectors());
+    }
+
     public Set<PluginDesc<Converter>> converters() {
         return scanResult.converters();
     }
 
+    public Set<PluginDesc<Converter>> converters(String converterClassOrAlias) 
{
+        return pluginsOfClass(converterClassOrAlias, scanResult.converters());
+    }
+
     public Set<PluginDesc<HeaderConverter>> headerConverters() {
         return scanResult.headerConverters();
     }
 
+    public Set<PluginDesc<HeaderConverter>> headerConverters(String 
headerConverterClassOrAlias) {
+        return pluginsOfClass(headerConverterClassOrAlias, 
scanResult.headerConverters());
+    }
+
     public Set<PluginDesc<Transformation<?>>> transformations() {
         return scanResult.transformations();
     }
 
+    public Set<PluginDesc<Transformation<?>>> transformations(String 
transformationClassOrAlias) {
+        return pluginsOfClass(transformationClassOrAlias, 
scanResult.transformations());
+    }
+
     public Set<PluginDesc<Predicate<?>>> predicates() {
         return scanResult.predicates();
     }
 
+    public Set<PluginDesc<Predicate<?>>> predicates(String 
predicateClassOrAlias) {
+        return pluginsOfClass(predicateClassOrAlias, scanResult.predicates());
+    }
+
     public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies() {
         return scanResult.connectorClientConfigPolicies();
     }
 
+    private <T> Set<PluginDesc<T>> pluginsOfClass(String classNameOrAlias, 
Set<PluginDesc<T>> allPluginsOfType) {
+        String className = 
delegatingLoader.resolveFullClassName(classNameOrAlias);
+        Set<PluginDesc<T>> plugins = new TreeSet<>();
+        for (PluginDesc<T> desc : allPluginsOfType) {
+            if (desc.className().equals(className)) {
+                plugins.add(desc);
+            }
+        }
+        return plugins;
+    }
+
     public Object newPlugin(String classOrAlias) throws ClassNotFoundException 
{
         Class<?> klass = pluginClass(delegatingLoader, classOrAlias, 
Object.class);
         return newPlugin(klass);
     }
 
+    public Object newPlugin(String classOrAlias, VersionRange range) throws 
VersionedPluginLoadingException, ClassNotFoundException {
+        Class<?> klass = pluginClass(delegatingLoader, classOrAlias, 
Object.class, range);
+        return newPlugin(klass);
+    }
+
     public Connector newConnector(String connectorClassOrAlias) {
         Class<? extends Connector> klass = 
connectorClass(connectorClassOrAlias);
         return newPlugin(klass);
     }
 
-    public Class<? extends Connector> connectorClass(String 
connectorClassOrAlias) {
+    public Connector newConnector(String connectorClassOrAlias, VersionRange 
range) throws VersionedPluginLoadingException {
+        Class<? extends Connector> klass = 
connectorClass(connectorClassOrAlias, range);
+        return newPlugin(klass);
+    }
+
+    public Class<? extends Connector> connectorClass(String 
connectorClassOrAlias, VersionRange range) throws 
VersionedPluginLoadingException {
         Class<? extends Connector> klass;
         try {
-            klass = pluginClass(
-                    delegatingLoader,
-                    connectorClassOrAlias,
-                    Connector.class
-            );
+            klass = range == null ?
+                pluginClass(delegatingLoader, connectorClassOrAlias, 
Connector.class):
+                pluginClass(delegatingLoader, connectorClassOrAlias, 
Connector.class, range);

Review Comment:
   The 4-argument `pluginClass` already handles null range, can you call that 
unconditionally?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -16,17 +16,18 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.maven.artifact.versioning.ArtifactVersion;
+import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
+import org.apache.maven.artifact.versioning.VersionRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.*;

Review Comment:
   nit: no star imports



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -434,50 +516,93 @@ public Converter newInternalConverter(boolean isKey, 
String className, Map<Strin
      * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
-        Class<? extends HeaderConverter> klass = null;
+        return getHeaderConverter(config, classPropertyName, null, 
classLoaderUsage);
+    }
+
+    public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;
+        return getHeaderConverter(config, classPropertyName, 
versionPropertyName, classLoader);
+    }
+
+    private HeaderConverter getHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName)) {
+            // This configuration does not define the Header Converter via the 
specified property name
+            return null;
+        }
+
+        HeaderConverter plugin = getVersionedPlugin(config, classPropertyName, 
versionPropertyName,
+                HeaderConverter.class, classLoaderUsage, 
scanResult.headerConverters());
+
+        String configPrefix = classPropertyName + ".";
+        Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
+        converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
+        log.debug("Configuring the header converter with configuration 
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
+
+        try (LoaderSwap loaderSwap = 
withClassLoader(plugin.getClass().getClassLoader())) {
+            plugin.configure(converterConfig);
+        }
+        return plugin;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <U> U getVersionedPlugin(
+            AbstractConfig config,
+            String classPropertyName,
+            String versionPropertyName,
+            Class basePluginClass,
+            ClassLoaderUsage classLoaderUsage,
+            SortedSet<PluginDesc<U>> availablePlugins
+    ) {
+
+        String version = versionPropertyName == null ? null : 
config.getString(versionPropertyName);
+        VersionRange range = null;
+        if (version != null) {
+            try {
+                range = VersionRange.createFromVersionSpec(version);
+            } catch (InvalidVersionSpecificationException e) {
+                throw new ConnectException(String.format("Invalid version 
range for %s: %s %s", classPropertyName, version, e));
+            }
+        }
+
+        Class<? extends U> klass = null;
+        String basePluginClassName = basePluginClass.getSimpleName();
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
-                if (!config.originals().containsKey(classPropertyName)) {
-                    // This connector configuration does not define the header 
converter via the specified property name
-                    return null;
-                }

Review Comment:
   Odd, the newHeaderConverter didn't perform this containsKey check when 
PLUGINS was specified, when the configuration is coming from the worker config.
   
   But since `header.converter` has a default, it was always valid to call 
getClass. Now the `getHeaderConverter` method performs this check 
unconditionally, does this mean that Worker#startTask will sometimes not 
instantiate a header converter?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginLoadingException.java:
##########
@@ -0,0 +1,32 @@
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.List;
+
+public class VersionedPluginLoadingException extends ConnectException {

Review Comment:
   Does this make more sense as a ConfigException?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -350,54 +437,49 @@ public Task newTask(Class<? extends Task> taskClass) {
      * @throws ConnectException if the {@link Converter} implementation class 
could not be found
      */
     public Converter newConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
+        return getConverter(config, classPropertyName, null, classLoaderUsage);
+    }
+
+    /**
+     * Used to get a versioned converter. It will always try and get the 
converter from the set of plugin classloaders.
+     *
+     * @param config              the configuration containing the {@link 
Converter}'s configuration; may not be null
+     * @param classPropertyName   the name of the property that contains the 
name of the {@link Converter} class; may not be null
+     * @param versionPropertyName the name of the property that contains the 
version of the {@link Converter} class; may not be null
+     * @return the instantiated and configured {@link Converter}; null if the 
configuration did not define the specified property
+     * @throws ConnectException if the {@link Converter} implementation class 
could not be found,
+     * @throws VersionedPluginLoadingException if the version requested is not 
found
+     */
+    public Converter newConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;
+        return getConverter(config, classPropertyName, versionPropertyName, 
classLoader);
+    }
+
+    private Converter getConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
         if (!config.originals().containsKey(classPropertyName)) {
             // This configuration does not define the converter via the 
specified property name
             return null;
         }
-

Review Comment:
   I think it was a good idea to deduplicate this switch-case. There's another 
copy in newConfigProvider that we can get rid of too, and just have it use a 
`null` `VersionRange`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -69,36 +70,109 @@ public DelegatingClassLoader() {
 
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
+     *
      * @param name The fully qualified class name of the plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
     // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
+
         SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
         return pluginLoader instanceof PluginClassLoader
-               ? (PluginClassLoader) pluginLoader
-               : null;
+            ? (PluginClassLoader) pluginLoader
+            : null;
+    }
+
+    PluginClassLoader pluginClassLoader(String name) {

Review Comment:
   I like this style of deduplication. Can you apply it to `connectorLoader` 
and `loadClass`/`loadVersionedPluginClass`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -240,6 +261,21 @@ public Runnable withClassLoader(ClassLoader classLoader, 
Runnable operation) {
         };
     }
 
+    public static LoaderSwap swapLoader(ClassLoader loader) {

Review Comment:
   I would prefer to keep this method (and the similar copy in PluginScanner) 
as instance methods, because they are simpler to mock during testing: #12817



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -350,54 +437,49 @@ public Task newTask(Class<? extends Task> taskClass) {
      * @throws ConnectException if the {@link Converter} implementation class 
could not be found
      */
     public Converter newConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
+        return getConverter(config, classPropertyName, null, classLoaderUsage);
+    }
+
+    /**
+     * Used to get a versioned converter. It will always try and get the 
converter from the set of plugin classloaders.
+     *
+     * @param config              the configuration containing the {@link 
Converter}'s configuration; may not be null
+     * @param classPropertyName   the name of the property that contains the 
name of the {@link Converter} class; may not be null
+     * @param versionPropertyName the name of the property that contains the 
version of the {@link Converter} class; may not be null
+     * @return the instantiated and configured {@link Converter}; null if the 
configuration did not define the specified property
+     * @throws ConnectException if the {@link Converter} implementation class 
could not be found,
+     * @throws VersionedPluginLoadingException if the version requested is not 
found
+     */
+    public Converter newConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;

Review Comment:
   Is this top comment misleading? The current classloader is used if no 
version is specified.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -350,54 +437,49 @@ public Task newTask(Class<? extends Task> taskClass) {
      * @throws ConnectException if the {@link Converter} implementation class 
could not be found
      */
     public Converter newConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
+        return getConverter(config, classPropertyName, null, classLoaderUsage);
+    }
+
+    /**
+     * Used to get a versioned converter. It will always try and get the 
converter from the set of plugin classloaders.
+     *
+     * @param config              the configuration containing the {@link 
Converter}'s configuration; may not be null
+     * @param classPropertyName   the name of the property that contains the 
name of the {@link Converter} class; may not be null
+     * @param versionPropertyName the name of the property that contains the 
version of the {@link Converter} class; may not be null
+     * @return the instantiated and configured {@link Converter}; null if the 
configuration did not define the specified property
+     * @throws ConnectException if the {@link Converter} implementation class 
could not be found,
+     * @throws VersionedPluginLoadingException if the version requested is not 
found
+     */
+    public Converter newConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;
+        return getConverter(config, classPropertyName, versionPropertyName, 
classLoader);
+    }
+
+    private Converter getConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {

Review Comment:
   nit: Don't use the `get` prefix, I don't think it makes sense when the 
converter is being created. `new` is more appropriate here.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -434,50 +516,93 @@ public Converter newInternalConverter(boolean isKey, 
String className, Map<Strin
      * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
-        Class<? extends HeaderConverter> klass = null;
+        return getHeaderConverter(config, classPropertyName, null, 
classLoaderUsage);
+    }
+
+    public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;
+        return getHeaderConverter(config, classPropertyName, 
versionPropertyName, classLoader);
+    }
+
+    private HeaderConverter getHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName)) {
+            // This configuration does not define the Header Converter via the 
specified property name
+            return null;
+        }
+
+        HeaderConverter plugin = getVersionedPlugin(config, classPropertyName, 
versionPropertyName,
+                HeaderConverter.class, classLoaderUsage, 
scanResult.headerConverters());
+
+        String configPrefix = classPropertyName + ".";
+        Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
+        converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
+        log.debug("Configuring the header converter with configuration 
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
+
+        try (LoaderSwap loaderSwap = 
withClassLoader(plugin.getClass().getClassLoader())) {
+            plugin.configure(converterConfig);
+        }
+        return plugin;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <U> U getVersionedPlugin(
+            AbstractConfig config,
+            String classPropertyName,
+            String versionPropertyName,
+            Class basePluginClass,
+            ClassLoaderUsage classLoaderUsage,
+            SortedSet<PluginDesc<U>> availablePlugins
+    ) {
+
+        String version = versionPropertyName == null ? null : 
config.getString(versionPropertyName);
+        VersionRange range = null;
+        if (version != null) {
+            try {
+                range = VersionRange.createFromVersionSpec(version);
+            } catch (InvalidVersionSpecificationException e) {
+                throw new ConnectException(String.format("Invalid version 
range for %s: %s %s", classPropertyName, version, e));
+            }
+        }
+
+        Class<? extends U> klass = null;
+        String basePluginClassName = basePluginClass.getSimpleName();
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
-                if (!config.originals().containsKey(classPropertyName)) {
-                    // This connector configuration does not define the header 
converter via the specified property name
-                    return null;
-                }
                 // Attempt to load first with the current classloader, and 
plugins as a fallback.
-                // Note: we can't use config.getConfiguredInstance because we 
have to remove the property prefixes
-                // before calling config(...)
-                klass = pluginClassFromConfig(config, classPropertyName, 
HeaderConverter.class, scanResult.headerConverters());
+                // Note: we can't use config.getConfiguredInstance because 
Converter doesn't implement Configurable, and even if it did
+                // we have to remove the property prefixes before calling 
config(...) and we still always want to call Converter.config.
+                klass = pluginClassFromConfig(config, classPropertyName, 
basePluginClass, availablePlugins);
                 break;
             case PLUGINS:
-                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback.
-                // Note that there will always be at least a default header 
converter for the worker
-                String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
+                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
+                String classOrAlias = 
config.getClass(classPropertyName).getName();
                 try {
-                    klass = pluginClass(
-                            delegatingLoader,
-                            converterClassOrAlias,
-                            HeaderConverter.class
-                    );
+                    klass = pluginClass(delegatingLoader, classOrAlias, 
basePluginClass, range);
                 } catch (ClassNotFoundException e) {
                     throw new ConnectException(
-                            "Failed to find any class that implements 
HeaderConverter and which name matches "
-                                    + converterClassOrAlias
-                                    + ", available header converters are: "
-                                    + 
pluginNames(scanResult.headerConverters())
+                            "Failed to find any class that implements " + 
basePluginClassName + " and which name matches "
+                                    + classOrAlias + ", available plugins are: 
"
+                                    + pluginNames(availablePlugins)
                     );
                 }
+                break;
         }
         if (klass == null) {
-            throw new ConnectException("Unable to initialize the 
HeaderConverter specified in '" + classPropertyName + "'");
+            throw new ConnectException("Unable to initialize the '" + 
basePluginClassName
+                    + "' specified in '" + classPropertyName + "'");
         }
 
-        String configPrefix = classPropertyName + ".";
-        Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
-        converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
-        log.debug("Configuring the header converter with configuration 
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
-
-        HeaderConverter plugin;
+        U plugin;
         try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
             plugin = newPlugin(klass);
-            plugin.configure(converterConfig);
+            DefaultArtifactVersion pluginVersion = new 
DefaultArtifactVersion(PluginScanner.versionFor(plugin));
+            if (range != null && range.hasRestrictions() && 
!range.containsVersion(pluginVersion)) {
+                // this can happen if the current class loader is used
+                // if there are version restrictions then this should be 
captured, and we should load using the plugin class loader
+                if (classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+                    return getVersionedPlugin(config, classPropertyName, 
versionPropertyName, basePluginClass, ClassLoaderUsage.PLUGINS, 
availablePlugins);
+                }
+            }

Review Comment:
   Can we assert `range == null || classLoaderUsage == PLUGINS` earlier, to 
ensure that we never get into this situation where we loaded a class of an 
invalid version?
   
   I think having this depend on range.containsVersion(pluginVersion) could 
lead to some strange behavior in the following situation:
   * key.converter.version = [1.0, 2.0)
   * Converter v1.0 is on the classpath
   * Converter v1.2 is in a different plugin
   * Converter v1.1 is in the local plugin
   
   If you don't have 1.1 installed, 1.2 is selected. If you later install 1.1 
(an older version), you get rolled back to 1.1.
   But if you had specified [1.2] it wouldn't get rolled back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to