gharris1727 commented on code in PR #16984: URL: https://github.com/apache/kafka/pull/16984#discussion_r1868472074
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -69,36 +70,109 @@ public DelegatingClassLoader() { /** * Retrieve the PluginClassLoader associated with a plugin class + * * @param name The fully qualified class name of the plugin * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. */ // VisibleForTesting - PluginClassLoader pluginClassLoader(String name) { + PluginClassLoader pluginClassLoader(String name, VersionRange range) { if (!PluginUtils.shouldLoadInIsolation(name)) { return null; } + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); if (inner == null) { return null; } - ClassLoader pluginLoader = inner.get(inner.lastKey()); + + + ClassLoader pluginLoader = findPluginLoader(inner, name, range); return pluginLoader instanceof PluginClassLoader - ? (PluginClassLoader) pluginLoader - : null; + ? (PluginClassLoader) pluginLoader + : null; + } + + PluginClassLoader pluginClassLoader(String name) { + return pluginClassLoader(name, null); + } + + ClassLoader connectorLoader(String connectorClassOrAlias, VersionRange range) throws ClassNotFoundException { + String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); + // if the plugin is not loaded via the plugin classloader, it might still be available in the parent delegating + // classloader, in order to check if the version satisfies the requirement we need to load the plugin class here + ClassLoader classLoader = loadVersionedPluginClass(fullName, range, false).getClassLoader(); + log.debug( + "Got plugin class loader: '{}' for connector: {}", + classLoader, + connectorClassOrAlias + ); + return classLoader; } ClassLoader connectorLoader(String connectorClassOrAlias) { String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); ClassLoader classLoader = pluginClassLoader(fullName); if (classLoader == null) classLoader = this; log.debug( - "Getting plugin class loader: '{}' for connector: {}", - classLoader, - connectorClassOrAlias + "Getting plugin class loader: '{}' for connector: {}", + classLoader, + connectorClassOrAlias ); return classLoader; } + String resolveFullClassName(String classOrAlias) { + return aliases.getOrDefault(classOrAlias, classOrAlias); + } + + String latestVersion(String classOrAlias) { + if (classOrAlias == null) { + return null; + } + String fullName = aliases.getOrDefault(classOrAlias, classOrAlias); + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName); + if (inner == null) { + return null; + } + return inner.lastKey().version(); + } + + private ClassLoader findPluginLoader( + SortedMap<PluginDesc<?>, ClassLoader> loaders, + String pluginName, + VersionRange range + ) { + + if (range != null) { + + ArtifactVersion version = range.getRecommendedVersion(); + + if (range.hasRestrictions()) { + List<ArtifactVersion> versions = loaders.keySet().stream().map(PluginDesc::encodedVersion).collect(Collectors.toList()); + version = range.matchVersion(versions); + if (version == null) { + List<String> availableVersions = loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList()); + throw new VersionedPluginLoadingException(String.format( + "Plugin loader for %s not found that matches the version range %s, available versions: %s", + pluginName, + range, + availableVersions + ), availableVersions); + } + } + + if (version != null) { + for (Map.Entry<PluginDesc<?>, ClassLoader> entry : loaders.entrySet()) { Review Comment: Since this iterates forwards, when there are two PluginDesc objects with the same version, it will prefer the earlier one, which could be the classpath copy. We should prefer the later isolated one instead ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -69,36 +70,109 @@ public DelegatingClassLoader() { /** * Retrieve the PluginClassLoader associated with a plugin class + * * @param name The fully qualified class name of the plugin * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. */ // VisibleForTesting - PluginClassLoader pluginClassLoader(String name) { + PluginClassLoader pluginClassLoader(String name, VersionRange range) { if (!PluginUtils.shouldLoadInIsolation(name)) { return null; } + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); if (inner == null) { return null; } - ClassLoader pluginLoader = inner.get(inner.lastKey()); + + + ClassLoader pluginLoader = findPluginLoader(inner, name, range); return pluginLoader instanceof PluginClassLoader - ? (PluginClassLoader) pluginLoader - : null; + ? (PluginClassLoader) pluginLoader + : null; + } + + PluginClassLoader pluginClassLoader(String name) { + return pluginClassLoader(name, null); + } + + ClassLoader connectorLoader(String connectorClassOrAlias, VersionRange range) throws ClassNotFoundException { + String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); + // if the plugin is not loaded via the plugin classloader, it might still be available in the parent delegating + // classloader, in order to check if the version satisfies the requirement we need to load the plugin class here + ClassLoader classLoader = loadVersionedPluginClass(fullName, range, false).getClassLoader(); Review Comment: There's a slight difference here: If the plugin is non-isolated, `DelegatingClassLoader#connectorLoader(String)` returns `this` (the delegating loader), but `DelegatingClassLoader#connectorLoader(String, VersionRange)` returns `parent` (the classpath). For a similar reason, `connectorLoader(String)` doesn't throw `ClassNotFoundException`, it returns `this`. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -122,11 +196,54 @@ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundE return super.loadClass(fullName, resolve); } + protected Class<?> loadVersionedPluginClass( + String name, + VersionRange range, + boolean resolve + ) throws VersionedPluginLoadingException, ClassNotFoundException { + + String fullName = aliases.getOrDefault(name, name); + PluginClassLoader pluginLoader = pluginClassLoader(fullName, range); + Class<?> plugin; + if (pluginLoader != null) { + log.trace("Retrieving loaded class '{}' from '{}'", name, pluginLoader); + plugin = pluginLoader.loadClass(fullName, resolve); + } else { + plugin = super.loadClass(fullName, resolve); + // if we are loading a plugin class from the parent classloader, we need to check if the version + // matches the range + String pluginVersion; + try (LoaderSwap classLoader = PluginScanner.withClassLoader(plugin.getClassLoader())) { + pluginVersion = PluginScanner.versionFor(plugin.getDeclaredConstructor().newInstance()); + } catch (ReflectiveOperationException | LinkageError e) { + throw new VersionedPluginLoadingException(String.format( + "Plugin %s was loaded with %s but failed to determine its version", + name, + plugin.getClassLoader() + ), e); + } Review Comment: This information should be computed ahead-of-time by the PluginScanner, not re-computed at runtime each time the class is requested. The parent loader of the DelegatingClassLoader will always be considered "classpath" in PluginDesc<> objects, which also include the version. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -69,36 +70,109 @@ public DelegatingClassLoader() { /** * Retrieve the PluginClassLoader associated with a plugin class + * * @param name The fully qualified class name of the plugin * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. */ // VisibleForTesting - PluginClassLoader pluginClassLoader(String name) { + PluginClassLoader pluginClassLoader(String name, VersionRange range) { if (!PluginUtils.shouldLoadInIsolation(name)) { return null; } + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); if (inner == null) { return null; } - ClassLoader pluginLoader = inner.get(inner.lastKey()); + + + ClassLoader pluginLoader = findPluginLoader(inner, name, range); return pluginLoader instanceof PluginClassLoader - ? (PluginClassLoader) pluginLoader - : null; + ? (PluginClassLoader) pluginLoader + : null; + } + + PluginClassLoader pluginClassLoader(String name) { + return pluginClassLoader(name, null); + } + + ClassLoader connectorLoader(String connectorClassOrAlias, VersionRange range) throws ClassNotFoundException { + String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); + // if the plugin is not loaded via the plugin classloader, it might still be available in the parent delegating + // classloader, in order to check if the version satisfies the requirement we need to load the plugin class here + ClassLoader classLoader = loadVersionedPluginClass(fullName, range, false).getClassLoader(); + log.debug( + "Got plugin class loader: '{}' for connector: {}", + classLoader, + connectorClassOrAlias + ); + return classLoader; } ClassLoader connectorLoader(String connectorClassOrAlias) { String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); ClassLoader classLoader = pluginClassLoader(fullName); if (classLoader == null) classLoader = this; log.debug( - "Getting plugin class loader: '{}' for connector: {}", - classLoader, - connectorClassOrAlias + "Getting plugin class loader: '{}' for connector: {}", + classLoader, + connectorClassOrAlias ); return classLoader; } + String resolveFullClassName(String classOrAlias) { + return aliases.getOrDefault(classOrAlias, classOrAlias); + } + + String latestVersion(String classOrAlias) { + if (classOrAlias == null) { + return null; + } + String fullName = aliases.getOrDefault(classOrAlias, classOrAlias); + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName); + if (inner == null) { + return null; + } + return inner.lastKey().version(); + } + + private ClassLoader findPluginLoader( + SortedMap<PluginDesc<?>, ClassLoader> loaders, + String pluginName, + VersionRange range + ) { + + if (range != null) { + + ArtifactVersion version = range.getRecommendedVersion(); Review Comment: Since we're not implementing soft requirements, i think getRecommendedVersion should always return null, and hasRestrictions always returns true. Rather than implementing this case which we don't expect to be used, can we avoid calling range.getRecommendedVersion entirely? If a soft requirement somehow gets passed in here, it could either be a no-op (behaving like null) or throw an exception, up to you. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -259,48 +299,91 @@ public Set<PluginDesc<SinkConnector>> sinkConnectors() { return scanResult.sinkConnectors(); } + public Set<PluginDesc<SinkConnector>> sinkConnectors(String connectorClassOrAlias) { + return pluginsOfClass(connectorClassOrAlias, scanResult.sinkConnectors()); + } + public Set<PluginDesc<SourceConnector>> sourceConnectors() { return scanResult.sourceConnectors(); } + public Set<PluginDesc<SourceConnector>> sourceConnectors(String connectorClassOrAlias) { + return pluginsOfClass(connectorClassOrAlias, scanResult.sourceConnectors()); + } + public Set<PluginDesc<Converter>> converters() { return scanResult.converters(); } + public Set<PluginDesc<Converter>> converters(String converterClassOrAlias) { + return pluginsOfClass(converterClassOrAlias, scanResult.converters()); + } + public Set<PluginDesc<HeaderConverter>> headerConverters() { return scanResult.headerConverters(); } + public Set<PluginDesc<HeaderConverter>> headerConverters(String headerConverterClassOrAlias) { + return pluginsOfClass(headerConverterClassOrAlias, scanResult.headerConverters()); + } + public Set<PluginDesc<Transformation<?>>> transformations() { return scanResult.transformations(); } + public Set<PluginDesc<Transformation<?>>> transformations(String transformationClassOrAlias) { + return pluginsOfClass(transformationClassOrAlias, scanResult.transformations()); + } + public Set<PluginDesc<Predicate<?>>> predicates() { return scanResult.predicates(); } + public Set<PluginDesc<Predicate<?>>> predicates(String predicateClassOrAlias) { + return pluginsOfClass(predicateClassOrAlias, scanResult.predicates()); + } + public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() { return scanResult.connectorClientConfigPolicies(); } + private <T> Set<PluginDesc<T>> pluginsOfClass(String classNameOrAlias, Set<PluginDesc<T>> allPluginsOfType) { Review Comment: This (and the methods calling it) are a lot of additional API surface area for the Plugins class, which is already quite large. What benefits does this "lookup versions of plugins by alias" functionality provide to compensate for the extra surface area? 1. Populate a map with null values in `AbstractHerder#getConnector` 2. Generate a list of valid versions in Recommender instances (e.g. ConnectorPluginVersionRecommender) I think (1) is very low value, and could be avoided with a null/contains check elsewhere, or could be performed for all plugins at once rather than lazily per-alias. (2) is a valid use-case, but could better be located within a common PluginVersionRecommender class, and utilize the existing Plugins API surface. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -69,36 +70,109 @@ public DelegatingClassLoader() { /** * Retrieve the PluginClassLoader associated with a plugin class + * * @param name The fully qualified class name of the plugin * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. */ // VisibleForTesting - PluginClassLoader pluginClassLoader(String name) { + PluginClassLoader pluginClassLoader(String name, VersionRange range) { if (!PluginUtils.shouldLoadInIsolation(name)) { return null; } + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); if (inner == null) { return null; } - ClassLoader pluginLoader = inner.get(inner.lastKey()); + + + ClassLoader pluginLoader = findPluginLoader(inner, name, range); return pluginLoader instanceof PluginClassLoader - ? (PluginClassLoader) pluginLoader - : null; + ? (PluginClassLoader) pluginLoader + : null; + } + + PluginClassLoader pluginClassLoader(String name) { + return pluginClassLoader(name, null); + } + + ClassLoader connectorLoader(String connectorClassOrAlias, VersionRange range) throws ClassNotFoundException { + String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); + // if the plugin is not loaded via the plugin classloader, it might still be available in the parent delegating + // classloader, in order to check if the version satisfies the requirement we need to load the plugin class here + ClassLoader classLoader = loadVersionedPluginClass(fullName, range, false).getClassLoader(); + log.debug( + "Got plugin class loader: '{}' for connector: {}", + classLoader, + connectorClassOrAlias + ); + return classLoader; } ClassLoader connectorLoader(String connectorClassOrAlias) { String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); ClassLoader classLoader = pluginClassLoader(fullName); if (classLoader == null) classLoader = this; log.debug( - "Getting plugin class loader: '{}' for connector: {}", - classLoader, - connectorClassOrAlias + "Getting plugin class loader: '{}' for connector: {}", + classLoader, + connectorClassOrAlias ); return classLoader; } + String resolveFullClassName(String classOrAlias) { + return aliases.getOrDefault(classOrAlias, classOrAlias); + } + + String latestVersion(String classOrAlias) { + if (classOrAlias == null) { + return null; + } + String fullName = aliases.getOrDefault(classOrAlias, classOrAlias); + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName); + if (inner == null) { + return null; + } + return inner.lastKey().version(); + } + + private ClassLoader findPluginLoader( + SortedMap<PluginDesc<?>, ClassLoader> loaders, + String pluginName, + VersionRange range + ) { + + if (range != null) { + + ArtifactVersion version = range.getRecommendedVersion(); + + if (range.hasRestrictions()) { + List<ArtifactVersion> versions = loaders.keySet().stream().map(PluginDesc::encodedVersion).collect(Collectors.toList()); + version = range.matchVersion(versions); Review Comment: nit: you can avoid iterating over the loaders twice if you use VersionRange#containsVersion instead. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -259,48 +299,91 @@ public Set<PluginDesc<SinkConnector>> sinkConnectors() { return scanResult.sinkConnectors(); } + public Set<PluginDesc<SinkConnector>> sinkConnectors(String connectorClassOrAlias) { + return pluginsOfClass(connectorClassOrAlias, scanResult.sinkConnectors()); + } + public Set<PluginDesc<SourceConnector>> sourceConnectors() { return scanResult.sourceConnectors(); } + public Set<PluginDesc<SourceConnector>> sourceConnectors(String connectorClassOrAlias) { + return pluginsOfClass(connectorClassOrAlias, scanResult.sourceConnectors()); + } + public Set<PluginDesc<Converter>> converters() { return scanResult.converters(); } + public Set<PluginDesc<Converter>> converters(String converterClassOrAlias) { + return pluginsOfClass(converterClassOrAlias, scanResult.converters()); + } + public Set<PluginDesc<HeaderConverter>> headerConverters() { return scanResult.headerConverters(); } + public Set<PluginDesc<HeaderConverter>> headerConverters(String headerConverterClassOrAlias) { + return pluginsOfClass(headerConverterClassOrAlias, scanResult.headerConverters()); + } + public Set<PluginDesc<Transformation<?>>> transformations() { return scanResult.transformations(); } + public Set<PluginDesc<Transformation<?>>> transformations(String transformationClassOrAlias) { + return pluginsOfClass(transformationClassOrAlias, scanResult.transformations()); + } + public Set<PluginDesc<Predicate<?>>> predicates() { return scanResult.predicates(); } + public Set<PluginDesc<Predicate<?>>> predicates(String predicateClassOrAlias) { + return pluginsOfClass(predicateClassOrAlias, scanResult.predicates()); + } + public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() { return scanResult.connectorClientConfigPolicies(); } + private <T> Set<PluginDesc<T>> pluginsOfClass(String classNameOrAlias, Set<PluginDesc<T>> allPluginsOfType) { + String className = delegatingLoader.resolveFullClassName(classNameOrAlias); + Set<PluginDesc<T>> plugins = new TreeSet<>(); + for (PluginDesc<T> desc : allPluginsOfType) { + if (desc.className().equals(className)) { + plugins.add(desc); + } + } + return plugins; + } + public Object newPlugin(String classOrAlias) throws ClassNotFoundException { Class<?> klass = pluginClass(delegatingLoader, classOrAlias, Object.class); return newPlugin(klass); } + public Object newPlugin(String classOrAlias, VersionRange range) throws VersionedPluginLoadingException, ClassNotFoundException { + Class<?> klass = pluginClass(delegatingLoader, classOrAlias, Object.class, range); + return newPlugin(klass); + } + public Connector newConnector(String connectorClassOrAlias) { Class<? extends Connector> klass = connectorClass(connectorClassOrAlias); return newPlugin(klass); } - public Class<? extends Connector> connectorClass(String connectorClassOrAlias) { + public Connector newConnector(String connectorClassOrAlias, VersionRange range) throws VersionedPluginLoadingException { + Class<? extends Connector> klass = connectorClass(connectorClassOrAlias, range); + return newPlugin(klass); + } + + public Class<? extends Connector> connectorClass(String connectorClassOrAlias, VersionRange range) throws VersionedPluginLoadingException { Class<? extends Connector> klass; try { - klass = pluginClass( - delegatingLoader, - connectorClassOrAlias, - Connector.class - ); + klass = range == null ? + pluginClass(delegatingLoader, connectorClassOrAlias, Connector.class): + pluginClass(delegatingLoader, connectorClassOrAlias, Connector.class, range); Review Comment: The 4-argument `pluginClass` already handles null range, can you call that unconditionally? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -16,17 +16,18 @@ */ package org.apache.kafka.connect.runtime.isolation; +import org.apache.maven.artifact.versioning.ArtifactVersion; +import org.apache.maven.artifact.versioning.DefaultArtifactVersion; +import org.apache.maven.artifact.versioning.VersionRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URL; import java.net.URLClassLoader; -import java.util.HashMap; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; +import java.util.*; Review Comment: nit: no star imports ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -434,50 +516,93 @@ public Converter newInternalConverter(boolean isKey, String className, Map<Strin * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found */ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { - Class<? extends HeaderConverter> klass = null; + return getHeaderConverter(config, classPropertyName, null, classLoaderUsage); + } + + public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName) { + ClassLoaderUsage classLoader = config.getString(versionPropertyName) == null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS; + return getHeaderConverter(config, classPropertyName, versionPropertyName, classLoader); + } + + private HeaderConverter getHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) { + if (!config.originals().containsKey(classPropertyName)) { + // This configuration does not define the Header Converter via the specified property name + return null; + } + + HeaderConverter plugin = getVersionedPlugin(config, classPropertyName, versionPropertyName, + HeaderConverter.class, classLoaderUsage, scanResult.headerConverters()); + + String configPrefix = classPropertyName + "."; + Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); + converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()); + log.debug("Configuring the header converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet()); + + try (LoaderSwap loaderSwap = withClassLoader(plugin.getClass().getClassLoader())) { + plugin.configure(converterConfig); + } + return plugin; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private <U> U getVersionedPlugin( + AbstractConfig config, + String classPropertyName, + String versionPropertyName, + Class basePluginClass, + ClassLoaderUsage classLoaderUsage, + SortedSet<PluginDesc<U>> availablePlugins + ) { + + String version = versionPropertyName == null ? null : config.getString(versionPropertyName); + VersionRange range = null; + if (version != null) { + try { + range = VersionRange.createFromVersionSpec(version); + } catch (InvalidVersionSpecificationException e) { + throw new ConnectException(String.format("Invalid version range for %s: %s %s", classPropertyName, version, e)); + } + } + + Class<? extends U> klass = null; + String basePluginClassName = basePluginClass.getSimpleName(); switch (classLoaderUsage) { case CURRENT_CLASSLOADER: - if (!config.originals().containsKey(classPropertyName)) { - // This connector configuration does not define the header converter via the specified property name - return null; - } Review Comment: Odd, the newHeaderConverter didn't perform this containsKey check when PLUGINS was specified, when the configuration is coming from the worker config. But since `header.converter` has a default, it was always valid to call getClass. Now the `getHeaderConverter` method performs this check unconditionally, does this mean that Worker#startTask will sometimes not instantiate a header converter? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginLoadingException.java: ########## @@ -0,0 +1,32 @@ +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.connect.errors.ConnectException; + +import java.util.List; + +public class VersionedPluginLoadingException extends ConnectException { Review Comment: Does this make more sense as a ConfigException? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -350,54 +437,49 @@ public Task newTask(Class<? extends Task> taskClass) { * @throws ConnectException if the {@link Converter} implementation class could not be found */ public Converter newConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { + return getConverter(config, classPropertyName, null, classLoaderUsage); + } + + /** + * Used to get a versioned converter. It will always try and get the converter from the set of plugin classloaders. + * + * @param config the configuration containing the {@link Converter}'s configuration; may not be null + * @param classPropertyName the name of the property that contains the name of the {@link Converter} class; may not be null + * @param versionPropertyName the name of the property that contains the version of the {@link Converter} class; may not be null + * @return the instantiated and configured {@link Converter}; null if the configuration did not define the specified property + * @throws ConnectException if the {@link Converter} implementation class could not be found, + * @throws VersionedPluginLoadingException if the version requested is not found + */ + public Converter newConverter(AbstractConfig config, String classPropertyName, String versionPropertyName) { + ClassLoaderUsage classLoader = config.getString(versionPropertyName) == null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS; + return getConverter(config, classPropertyName, versionPropertyName, classLoader); + } + + private Converter getConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) { if (!config.originals().containsKey(classPropertyName)) { // This configuration does not define the converter via the specified property name return null; } - Review Comment: I think it was a good idea to deduplicate this switch-case. There's another copy in newConfigProvider that we can get rid of too, and just have it use a `null` `VersionRange`. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -69,36 +70,109 @@ public DelegatingClassLoader() { /** * Retrieve the PluginClassLoader associated with a plugin class + * * @param name The fully qualified class name of the plugin * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. */ // VisibleForTesting - PluginClassLoader pluginClassLoader(String name) { + PluginClassLoader pluginClassLoader(String name, VersionRange range) { if (!PluginUtils.shouldLoadInIsolation(name)) { return null; } + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); if (inner == null) { return null; } - ClassLoader pluginLoader = inner.get(inner.lastKey()); + + + ClassLoader pluginLoader = findPluginLoader(inner, name, range); return pluginLoader instanceof PluginClassLoader - ? (PluginClassLoader) pluginLoader - : null; + ? (PluginClassLoader) pluginLoader + : null; + } + + PluginClassLoader pluginClassLoader(String name) { Review Comment: I like this style of deduplication. Can you apply it to `connectorLoader` and `loadClass`/`loadVersionedPluginClass`? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -240,6 +261,21 @@ public Runnable withClassLoader(ClassLoader classLoader, Runnable operation) { }; } + public static LoaderSwap swapLoader(ClassLoader loader) { Review Comment: I would prefer to keep this method (and the similar copy in PluginScanner) as instance methods, because they are simpler to mock during testing: #12817 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -350,54 +437,49 @@ public Task newTask(Class<? extends Task> taskClass) { * @throws ConnectException if the {@link Converter} implementation class could not be found */ public Converter newConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { + return getConverter(config, classPropertyName, null, classLoaderUsage); + } + + /** + * Used to get a versioned converter. It will always try and get the converter from the set of plugin classloaders. + * + * @param config the configuration containing the {@link Converter}'s configuration; may not be null + * @param classPropertyName the name of the property that contains the name of the {@link Converter} class; may not be null + * @param versionPropertyName the name of the property that contains the version of the {@link Converter} class; may not be null + * @return the instantiated and configured {@link Converter}; null if the configuration did not define the specified property + * @throws ConnectException if the {@link Converter} implementation class could not be found, + * @throws VersionedPluginLoadingException if the version requested is not found + */ + public Converter newConverter(AbstractConfig config, String classPropertyName, String versionPropertyName) { + ClassLoaderUsage classLoader = config.getString(versionPropertyName) == null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS; Review Comment: Is this top comment misleading? The current classloader is used if no version is specified. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -350,54 +437,49 @@ public Task newTask(Class<? extends Task> taskClass) { * @throws ConnectException if the {@link Converter} implementation class could not be found */ public Converter newConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { + return getConverter(config, classPropertyName, null, classLoaderUsage); + } + + /** + * Used to get a versioned converter. It will always try and get the converter from the set of plugin classloaders. + * + * @param config the configuration containing the {@link Converter}'s configuration; may not be null + * @param classPropertyName the name of the property that contains the name of the {@link Converter} class; may not be null + * @param versionPropertyName the name of the property that contains the version of the {@link Converter} class; may not be null + * @return the instantiated and configured {@link Converter}; null if the configuration did not define the specified property + * @throws ConnectException if the {@link Converter} implementation class could not be found, + * @throws VersionedPluginLoadingException if the version requested is not found + */ + public Converter newConverter(AbstractConfig config, String classPropertyName, String versionPropertyName) { + ClassLoaderUsage classLoader = config.getString(versionPropertyName) == null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS; + return getConverter(config, classPropertyName, versionPropertyName, classLoader); + } + + private Converter getConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) { Review Comment: nit: Don't use the `get` prefix, I don't think it makes sense when the converter is being created. `new` is more appropriate here. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -434,50 +516,93 @@ public Converter newInternalConverter(boolean isKey, String className, Map<Strin * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found */ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { - Class<? extends HeaderConverter> klass = null; + return getHeaderConverter(config, classPropertyName, null, classLoaderUsage); + } + + public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName) { + ClassLoaderUsage classLoader = config.getString(versionPropertyName) == null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS; + return getHeaderConverter(config, classPropertyName, versionPropertyName, classLoader); + } + + private HeaderConverter getHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) { + if (!config.originals().containsKey(classPropertyName)) { + // This configuration does not define the Header Converter via the specified property name + return null; + } + + HeaderConverter plugin = getVersionedPlugin(config, classPropertyName, versionPropertyName, + HeaderConverter.class, classLoaderUsage, scanResult.headerConverters()); + + String configPrefix = classPropertyName + "."; + Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); + converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()); + log.debug("Configuring the header converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet()); + + try (LoaderSwap loaderSwap = withClassLoader(plugin.getClass().getClassLoader())) { + plugin.configure(converterConfig); + } + return plugin; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private <U> U getVersionedPlugin( + AbstractConfig config, + String classPropertyName, + String versionPropertyName, + Class basePluginClass, + ClassLoaderUsage classLoaderUsage, + SortedSet<PluginDesc<U>> availablePlugins + ) { + + String version = versionPropertyName == null ? null : config.getString(versionPropertyName); + VersionRange range = null; + if (version != null) { + try { + range = VersionRange.createFromVersionSpec(version); + } catch (InvalidVersionSpecificationException e) { + throw new ConnectException(String.format("Invalid version range for %s: %s %s", classPropertyName, version, e)); + } + } + + Class<? extends U> klass = null; + String basePluginClassName = basePluginClass.getSimpleName(); switch (classLoaderUsage) { case CURRENT_CLASSLOADER: - if (!config.originals().containsKey(classPropertyName)) { - // This connector configuration does not define the header converter via the specified property name - return null; - } // Attempt to load first with the current classloader, and plugins as a fallback. - // Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes - // before calling config(...) - klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, scanResult.headerConverters()); + // Note: we can't use config.getConfiguredInstance because Converter doesn't implement Configurable, and even if it did + // we have to remove the property prefixes before calling config(...) and we still always want to call Converter.config. + klass = pluginClassFromConfig(config, classPropertyName, basePluginClass, availablePlugins); break; case PLUGINS: - // Attempt to load with the plugin class loader, which uses the current classloader as a fallback. - // Note that there will always be at least a default header converter for the worker - String converterClassOrAlias = config.getClass(classPropertyName).getName(); + // Attempt to load with the plugin class loader, which uses the current classloader as a fallback + String classOrAlias = config.getClass(classPropertyName).getName(); try { - klass = pluginClass( - delegatingLoader, - converterClassOrAlias, - HeaderConverter.class - ); + klass = pluginClass(delegatingLoader, classOrAlias, basePluginClass, range); } catch (ClassNotFoundException e) { throw new ConnectException( - "Failed to find any class that implements HeaderConverter and which name matches " - + converterClassOrAlias - + ", available header converters are: " - + pluginNames(scanResult.headerConverters()) + "Failed to find any class that implements " + basePluginClassName + " and which name matches " + + classOrAlias + ", available plugins are: " + + pluginNames(availablePlugins) ); } + break; } if (klass == null) { - throw new ConnectException("Unable to initialize the HeaderConverter specified in '" + classPropertyName + "'"); + throw new ConnectException("Unable to initialize the '" + basePluginClassName + + "' specified in '" + classPropertyName + "'"); } - String configPrefix = classPropertyName + "."; - Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); - converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()); - log.debug("Configuring the header converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet()); - - HeaderConverter plugin; + U plugin; try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) { plugin = newPlugin(klass); - plugin.configure(converterConfig); + DefaultArtifactVersion pluginVersion = new DefaultArtifactVersion(PluginScanner.versionFor(plugin)); + if (range != null && range.hasRestrictions() && !range.containsVersion(pluginVersion)) { + // this can happen if the current class loader is used + // if there are version restrictions then this should be captured, and we should load using the plugin class loader + if (classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { + return getVersionedPlugin(config, classPropertyName, versionPropertyName, basePluginClass, ClassLoaderUsage.PLUGINS, availablePlugins); + } + } Review Comment: Can we assert `range == null || classLoaderUsage == PLUGINS` earlier, to ensure that we never get into this situation where we loaded a class of an invalid version? I think having this depend on range.containsVersion(pluginVersion) could lead to some strange behavior in the following situation: * key.converter.version = [1.0, 2.0) * Converter v1.0 is on the classpath * Converter v1.2 is in a different plugin * Converter v1.1 is in the local plugin If you don't have 1.1 installed, 1.2 is selected. If you later install 1.1 (an older version), you get rolled back to 1.1. But if you had specified [1.2] it wouldn't get rolled back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org