gharris1727 commented on code in PR #16984: URL: https://github.com/apache/kafka/pull/16984#discussion_r1873720403
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -112,21 +199,70 @@ public void installDiscoveredPlugins(PluginScanResult scanResult) { @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + return loadVersionedPluginClass(name, null, resolve); + } + + protected Class<?> loadVersionedPluginClass( + String name, + VersionRange range, + boolean resolve + ) throws VersionedPluginLoadingException, ClassNotFoundException { + String fullName = aliases.getOrDefault(name, name); - PluginClassLoader pluginLoader = pluginClassLoader(fullName); + PluginClassLoader pluginLoader = pluginClassLoader(fullName, range); + Class<?> plugin; if (pluginLoader != null) { - log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader); - return pluginLoader.loadClass(fullName, resolve); + log.trace("Retrieving loaded class '{}' from '{}'", name, pluginLoader); + plugin = pluginLoader.loadClass(fullName, resolve); + } else { + plugin = super.loadClass(fullName, resolve); + if (range == null) { + return plugin; + } + + String pluginVersion; + SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = pluginLoaders.get(fullName); + + if (scannedPlugin == null) { + throw new VersionedPluginLoadingException(String.format( + "Plugin %s is not part of Connect's plugin loading mechanism (ClassPath or Plugin Path)", + fullName + )); + } + + List<PluginDesc<?>> classpathPlugins = scannedPlugin.keySet().stream() + .filter(pluginDesc -> pluginDesc.location().equals("classpath")) + .collect(Collectors.toList()); + + if (classpathPlugins.size() > 1) { + throw new VersionedPluginLoadingException(String.format( + "Plugin %s has multiple versions specified in class path, " + + "only one version is allowed in class path for loading a plugin with version range", + fullName + )); + } else { Review Comment: There's also the case of classpathPlugins.isEmpty(), in case loadClass returns a valid class, but it wasn't discovered by scanning, like if the class is abstract or not migrated. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -69,36 +75,117 @@ public DelegatingClassLoader() { /** * Retrieve the PluginClassLoader associated with a plugin class + * * @param name The fully qualified class name of the plugin * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. */ // VisibleForTesting - PluginClassLoader pluginClassLoader(String name) { + PluginClassLoader pluginClassLoader(String name, VersionRange range) { if (!PluginUtils.shouldLoadInIsolation(name)) { return null; } + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); if (inner == null) { return null; } - ClassLoader pluginLoader = inner.get(inner.lastKey()); + + + ClassLoader pluginLoader = findPluginLoader(inner, name, range); return pluginLoader instanceof PluginClassLoader - ? (PluginClassLoader) pluginLoader - : null; + ? (PluginClassLoader) pluginLoader + : null; } - ClassLoader connectorLoader(String connectorClassOrAlias) { - String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); - ClassLoader classLoader = pluginClassLoader(fullName); - if (classLoader == null) classLoader = this; + PluginClassLoader pluginClassLoader(String name) { + return pluginClassLoader(name, null); + } + + ClassLoader loader(String classOrAlias, VersionRange range) throws ClassNotFoundException { + String fullName = aliases.getOrDefault(classOrAlias, classOrAlias); + ClassLoader classLoader = range == null ? pluginClassLoader(fullName) : + // load the plugin class when version is provided which will validate the correct version is loaded + loadVersionedPluginClass(fullName, range, false).getClassLoader(); + // the classloader returned can be the classpath loader in which case we should return the delegating classloader + if (!(classLoader instanceof PluginClassLoader)) { + classLoader = this; + } log.debug( - "Getting plugin class loader: '{}' for connector: {}", - classLoader, - connectorClassOrAlias + "Got plugin class loader: '{}' for connector: {}", + classLoader, + classOrAlias ); return classLoader; } + ClassLoader loader(String classOrAlias) { + try { + return loader(classOrAlias, null); + } catch (ClassNotFoundException e) { + // class not found should not happen here as version is not provided + } + return this; + } + + ClassLoader connectorLoader(String connectorClassOrAlias) { + return loader(connectorClassOrAlias); + } + + String resolveFullClassName(String classOrAlias) { + return aliases.getOrDefault(classOrAlias, classOrAlias); + } + + String latestVersion(String classOrAlias) { + if (classOrAlias == null) { + return null; + } + String fullName = aliases.getOrDefault(classOrAlias, classOrAlias); + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName); + if (inner == null) { + return null; + } + return inner.lastKey().version(); + } + + private ClassLoader findPluginLoader( + SortedMap<PluginDesc<?>, ClassLoader> loaders, + String pluginName, + VersionRange range + ) { + + if (range != null) { + + if (null != range.getRecommendedVersion()) { + throw new VersionedPluginLoadingException(String.format("A soft version range is not supported for plugin loading, " + + "this is an internal error as connect should automatically convert soft ranges to hard ranges. " + + "Provided soft version: %s ", range)); + } + + ArtifactVersion version = null; + ClassLoader loader = null; + for (Map.Entry<PluginDesc<?>, ClassLoader> entry : loaders.entrySet()) { + // the entries should be in sorted order of versions so this should end up picking the latest version which matches the range + if (range.containsVersion(entry.getKey().encodedVersion()) && entry.getValue() instanceof PluginClassLoader) { Review Comment: nit: this `instanceof` check can be removed because it duplicates the check in the caller. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -112,21 +199,70 @@ public void installDiscoveredPlugins(PluginScanResult scanResult) { @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + return loadVersionedPluginClass(name, null, resolve); + } + + protected Class<?> loadVersionedPluginClass( + String name, + VersionRange range, + boolean resolve + ) throws VersionedPluginLoadingException, ClassNotFoundException { + String fullName = aliases.getOrDefault(name, name); - PluginClassLoader pluginLoader = pluginClassLoader(fullName); + PluginClassLoader pluginLoader = pluginClassLoader(fullName, range); + Class<?> plugin; if (pluginLoader != null) { - log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader); - return pluginLoader.loadClass(fullName, resolve); + log.trace("Retrieving loaded class '{}' from '{}'", name, pluginLoader); + plugin = pluginLoader.loadClass(fullName, resolve); + } else { + plugin = super.loadClass(fullName, resolve); + if (range == null) { + return plugin; + } + + String pluginVersion; + SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = pluginLoaders.get(fullName); + + if (scannedPlugin == null) { + throw new VersionedPluginLoadingException(String.format( + "Plugin %s is not part of Connect's plugin loading mechanism (ClassPath or Plugin Path)", + fullName + )); + } + + List<PluginDesc<?>> classpathPlugins = scannedPlugin.keySet().stream() + .filter(pluginDesc -> pluginDesc.location().equals("classpath")) + .collect(Collectors.toList()); + + if (classpathPlugins.size() > 1) { + throw new VersionedPluginLoadingException(String.format( + "Plugin %s has multiple versions specified in class path, " + + "only one version is allowed in class path for loading a plugin with version range", + fullName + )); + } else { + pluginVersion = classpathPlugins.get(0).version(); + if (!range.containsVersion(new DefaultArtifactVersion(pluginVersion))) { + throw new VersionedPluginLoadingException(String.format( + "Plugin %s has version %s which does not match the required version range %s", + fullName, + pluginVersion, + range + ), Collections.singletonList(pluginVersion)); + } + } Review Comment: nit: All of the verification steps after the range == null check can be moved to a separate method, maybe `ArtifactVersion classpathPluginVersion(String)` or `boolean classpathPluginVersionInRange(String, VersionRange)`. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ########## @@ -222,7 +222,7 @@ protected static String reflectiveErrorDescription(Throwable t) { } } - protected LoaderSwap withClassLoader(ClassLoader loader) { + protected static LoaderSwap withClassLoader(ClassLoader loader) { Review Comment: This can be nonstatic now that DelegatingLoader isn't using it. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -69,36 +75,117 @@ public DelegatingClassLoader() { /** * Retrieve the PluginClassLoader associated with a plugin class + * * @param name The fully qualified class name of the plugin * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. */ // VisibleForTesting - PluginClassLoader pluginClassLoader(String name) { + PluginClassLoader pluginClassLoader(String name, VersionRange range) { if (!PluginUtils.shouldLoadInIsolation(name)) { return null; } + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); if (inner == null) { return null; } - ClassLoader pluginLoader = inner.get(inner.lastKey()); + + + ClassLoader pluginLoader = findPluginLoader(inner, name, range); return pluginLoader instanceof PluginClassLoader - ? (PluginClassLoader) pluginLoader - : null; + ? (PluginClassLoader) pluginLoader + : null; } - ClassLoader connectorLoader(String connectorClassOrAlias) { - String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); - ClassLoader classLoader = pluginClassLoader(fullName); - if (classLoader == null) classLoader = this; + PluginClassLoader pluginClassLoader(String name) { + return pluginClassLoader(name, null); + } + + ClassLoader loader(String classOrAlias, VersionRange range) throws ClassNotFoundException { + String fullName = aliases.getOrDefault(classOrAlias, classOrAlias); + ClassLoader classLoader = range == null ? pluginClassLoader(fullName) : + // load the plugin class when version is provided which will validate the correct version is loaded + loadVersionedPluginClass(fullName, range, false).getClassLoader(); + // the classloader returned can be the classpath loader in which case we should return the delegating classloader + if (!(classLoader instanceof PluginClassLoader)) { + classLoader = this; + } log.debug( - "Getting plugin class loader: '{}' for connector: {}", - classLoader, - connectorClassOrAlias + "Got plugin class loader: '{}' for connector: {}", + classLoader, + classOrAlias ); return classLoader; } + ClassLoader loader(String classOrAlias) { + try { + return loader(classOrAlias, null); + } catch (ClassNotFoundException e) { + // class not found should not happen here as version is not provided + } + return this; + } + + ClassLoader connectorLoader(String connectorClassOrAlias) { + return loader(connectorClassOrAlias); + } + + String resolveFullClassName(String classOrAlias) { + return aliases.getOrDefault(classOrAlias, classOrAlias); + } + + String latestVersion(String classOrAlias) { + if (classOrAlias == null) { + return null; + } + String fullName = aliases.getOrDefault(classOrAlias, classOrAlias); + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName); + if (inner == null) { + return null; + } + return inner.lastKey().version(); + } + + private ClassLoader findPluginLoader( + SortedMap<PluginDesc<?>, ClassLoader> loaders, + String pluginName, + VersionRange range + ) { + + if (range != null) { + + if (null != range.getRecommendedVersion()) { + throw new VersionedPluginLoadingException(String.format("A soft version range is not supported for plugin loading, " + + "this is an internal error as connect should automatically convert soft ranges to hard ranges. " + + "Provided soft version: %s ", range)); + } + + ArtifactVersion version = null; + ClassLoader loader = null; + for (Map.Entry<PluginDesc<?>, ClassLoader> entry : loaders.entrySet()) { + // the entries should be in sorted order of versions so this should end up picking the latest version which matches the range + if (range.containsVersion(entry.getKey().encodedVersion()) && entry.getValue() instanceof PluginClassLoader) { + version = entry.getKey().encodedVersion(); + loader = entry.getValue(); + } + } + + if (version == null) { Review Comment: nit: If you change this to loader == null you an eliminate the `version` variable entirely. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginLoadingException.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.common.config.ConfigException; + +import java.util.List; + +public class VersionedPluginLoadingException extends ConfigException { + + private List<String> availableVersions = null; + + public VersionedPluginLoadingException(String message) { + super(message); + } + + public VersionedPluginLoadingException(String message, List<String> availableVersions) { + super(message); + this.availableVersions = availableVersions; + } + + public VersionedPluginLoadingException(String message, Throwable cause) { + super(message, cause); + } + + public VersionedPluginLoadingException(String message, Throwable cause, List<String> availableVersions) { + super(message, cause); Review Comment: Hmm this is not a helpful constructor, it doesn't set the `Throwable cause`. Since these constructors appear unused, and VersionedPluginLoadingExceptions are from invalid constraints and not other operations failing, WDYT about removing these constructors? ########## connect/runtime/src/main/java/org/apache/kafka/connect/util/PluginVersionUtils.java: ########## @@ -0,0 +1,30 @@ +package org.apache.kafka.connect.util; + +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException; +import org.apache.maven.artifact.versioning.VersionRange; + +public class PluginVersionUtils { + + private static Plugins plugins = null; + + public static void setPlugins(Plugins plugins) { + PluginVersionUtils.plugins = plugins; Review Comment: Did this leak in from the other PR? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -427,99 +510,121 @@ public Converter newInternalConverter(boolean isKey, String className, Map<Strin * If the given configuration defines a {@link HeaderConverter} using the named configuration property, return a new configured * instance. * - * @param config the configuration containing the {@link Converter}'s configuration; may not be null - * @param classPropertyName the name of the property that contains the name of the {@link Converter} class; may not be null - * @param classLoaderUsage which classloader should be used + * @param config the configuration containing the {@link HeaderConverter}'s configuration; may not be null + * @param classPropertyName the name of the property that contains the name of the {@link HeaderConverter} class; may not be null + * @param classLoaderUsage the name of the property that contains the version of the {@link HeaderConverter} class; may not be null * @return the instantiated and configured {@link HeaderConverter}; null if the configuration did not define the specified property * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found */ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { - Class<? extends HeaderConverter> klass = null; - switch (classLoaderUsage) { - case CURRENT_CLASSLOADER: - if (!config.originals().containsKey(classPropertyName)) { - // This connector configuration does not define the header converter via the specified property name - return null; - } - // Attempt to load first with the current classloader, and plugins as a fallback. - // Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes - // before calling config(...) - klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, scanResult.headerConverters()); - break; - case PLUGINS: - // Attempt to load with the plugin class loader, which uses the current classloader as a fallback. - // Note that there will always be at least a default header converter for the worker - String converterClassOrAlias = config.getClass(classPropertyName).getName(); - try { - klass = pluginClass( - delegatingLoader, - converterClassOrAlias, - HeaderConverter.class - ); - } catch (ClassNotFoundException e) { - throw new ConnectException( - "Failed to find any class that implements HeaderConverter and which name matches " - + converterClassOrAlias - + ", available header converters are: " - + pluginNames(scanResult.headerConverters()) - ); - } - } - if (klass == null) { - throw new ConnectException("Unable to initialize the HeaderConverter specified in '" + classPropertyName + "'"); + return newHeaderConverter(config, classPropertyName, null, classLoaderUsage); + } + + /** + * If the given configuration defines a {@link HeaderConverter} using the named configuration property, return a new configured + * instance. If the version is specified, it will always use the plugins classloader. + * + * @param config the configuration containing the {@link HeaderConverter}'s configuration; may not be null + * @param classPropertyName the name of the property that contains the name of the {@link HeaderConverter} class; may not be null + * @param versionPropertyName the config for the version for the header converter + * @return the instantiated and configured {@link HeaderConverter}; null if the configuration did not define the specified property + * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found + */ + public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName) { + ClassLoaderUsage classLoader = config.getString(versionPropertyName) == null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS; + return newHeaderConverter(config, classPropertyName, versionPropertyName, classLoader); + } + + private HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) { + if (!config.originals().containsKey(classPropertyName) && classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { + // This configuration does not define the Header Converter via the specified property name + return null; } + HeaderConverter plugin = newVersionedPlugin(config, classPropertyName, versionPropertyName, + HeaderConverter.class, classLoaderUsage, scanResult.headerConverters()); String configPrefix = classPropertyName + "."; Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()); log.debug("Configuring the header converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet()); - HeaderConverter plugin; - try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) { - plugin = newPlugin(klass); + try (LoaderSwap loaderSwap = withClassLoader(plugin.getClass().getClassLoader())) { plugin.configure(converterConfig); } return plugin; } - public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) { - String classPropertyName = providerPrefix + ".class"; - Map<String, String> originalConfig = config.originalsStrings(); - if (!originalConfig.containsKey(classPropertyName)) { - // This configuration does not define the config provider via the specified property name - return null; + @SuppressWarnings({"unchecked", "rawtypes"}) + private <U> U newVersionedPlugin( + AbstractConfig config, + String classPropertyName, + String versionPropertyName, + Class basePluginClass, + ClassLoaderUsage classLoaderUsage, + SortedSet<PluginDesc<U>> availablePlugins + ) { + + String version = versionPropertyName == null ? null : config.getString(versionPropertyName); + VersionRange range = null; + if (version != null) { + try { + range = PluginVersionUtils.connectorVersionRequirement(version); + } catch (InvalidVersionSpecificationException e) { + throw new ConnectException(String.format("Invalid version range for %s: %s %s", classPropertyName, version, e)); Review Comment: nit: move the exception to the `cause` argument. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -112,21 +199,70 @@ public void installDiscoveredPlugins(PluginScanResult scanResult) { @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + return loadVersionedPluginClass(name, null, resolve); + } + + protected Class<?> loadVersionedPluginClass( + String name, + VersionRange range, + boolean resolve + ) throws VersionedPluginLoadingException, ClassNotFoundException { + String fullName = aliases.getOrDefault(name, name); - PluginClassLoader pluginLoader = pluginClassLoader(fullName); + PluginClassLoader pluginLoader = pluginClassLoader(fullName, range); + Class<?> plugin; if (pluginLoader != null) { - log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader); - return pluginLoader.loadClass(fullName, resolve); + log.trace("Retrieving loaded class '{}' from '{}'", name, pluginLoader); + plugin = pluginLoader.loadClass(fullName, resolve); + } else { + plugin = super.loadClass(fullName, resolve); + if (range == null) { + return plugin; + } + + String pluginVersion; + SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = pluginLoaders.get(fullName); + + if (scannedPlugin == null) { + throw new VersionedPluginLoadingException(String.format( + "Plugin %s is not part of Connect's plugin loading mechanism (ClassPath or Plugin Path)", + fullName + )); + } + + List<PluginDesc<?>> classpathPlugins = scannedPlugin.keySet().stream() + .filter(pluginDesc -> pluginDesc.location().equals("classpath")) + .collect(Collectors.toList()); + + if (classpathPlugins.size() > 1) { + throw new VersionedPluginLoadingException(String.format( + "Plugin %s has multiple versions specified in class path, " + + "only one version is allowed in class path for loading a plugin with version range", + fullName + )); + } else { + pluginVersion = classpathPlugins.get(0).version(); + if (!range.containsVersion(new DefaultArtifactVersion(pluginVersion))) { + throw new VersionedPluginLoadingException(String.format( + "Plugin %s has version %s which does not match the required version range %s", + fullName, + pluginVersion, + range + ), Collections.singletonList(pluginVersion)); + } + } } - return super.loadClass(fullName, resolve); + return plugin; } + + Review Comment: nit: extra lines ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -427,99 +503,121 @@ public Converter newInternalConverter(boolean isKey, String className, Map<Strin * If the given configuration defines a {@link HeaderConverter} using the named configuration property, return a new configured * instance. * - * @param config the configuration containing the {@link Converter}'s configuration; may not be null - * @param classPropertyName the name of the property that contains the name of the {@link Converter} class; may not be null - * @param classLoaderUsage which classloader should be used + * @param config the configuration containing the {@link HeaderConverter}'s configuration; may not be null + * @param classPropertyName the name of the property that contains the name of the {@link HeaderConverter} class; may not be null + * @param classLoaderUsage the name of the property that contains the version of the {@link HeaderConverter} class; may not be null * @return the instantiated and configured {@link HeaderConverter}; null if the configuration did not define the specified property * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found */ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { - Class<? extends HeaderConverter> klass = null; - switch (classLoaderUsage) { - case CURRENT_CLASSLOADER: - if (!config.originals().containsKey(classPropertyName)) { - // This connector configuration does not define the header converter via the specified property name - return null; - } - // Attempt to load first with the current classloader, and plugins as a fallback. - // Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes - // before calling config(...) - klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, scanResult.headerConverters()); - break; - case PLUGINS: - // Attempt to load with the plugin class loader, which uses the current classloader as a fallback. - // Note that there will always be at least a default header converter for the worker - String converterClassOrAlias = config.getClass(classPropertyName).getName(); - try { - klass = pluginClass( - delegatingLoader, - converterClassOrAlias, - HeaderConverter.class - ); - } catch (ClassNotFoundException e) { - throw new ConnectException( - "Failed to find any class that implements HeaderConverter and which name matches " - + converterClassOrAlias - + ", available header converters are: " - + pluginNames(scanResult.headerConverters()) - ); - } - } - if (klass == null) { - throw new ConnectException("Unable to initialize the HeaderConverter specified in '" + classPropertyName + "'"); + return newHeaderConverter(config, classPropertyName, null, classLoaderUsage); + } + + /** + * If the given configuration defines a {@link HeaderConverter} using the named configuration property, return a new configured + * instance. If the version is specified, it will always use the plugins classloader. + * + * @param config the configuration containing the {@link HeaderConverter}'s configuration; may not be null + * @param classPropertyName the name of the property that contains the name of the {@link HeaderConverter} class; may not be null + * @param versionPropertyName the config for the version for the header converter + * @return the instantiated and configured {@link HeaderConverter}; null if the configuration did not define the specified property + * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found + */ + public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName) { + ClassLoaderUsage classLoader = config.getString(versionPropertyName) == null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS; + return newHeaderConverter(config, classPropertyName, versionPropertyName, classLoader); + } + + private HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) { + if (!config.originals().containsKey(classPropertyName) && classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { + // This configuration does not define the Header Converter via the specified property name + return null; } + HeaderConverter plugin = newVersionedPlugin(config, classPropertyName, versionPropertyName, + HeaderConverter.class, classLoaderUsage, scanResult.headerConverters()); String configPrefix = classPropertyName + "."; Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()); log.debug("Configuring the header converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet()); - HeaderConverter plugin; - try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) { - plugin = newPlugin(klass); + try (LoaderSwap loaderSwap = withClassLoader(plugin.getClass().getClassLoader())) { plugin.configure(converterConfig); } return plugin; } - public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) { - String classPropertyName = providerPrefix + ".class"; - Map<String, String> originalConfig = config.originalsStrings(); - if (!originalConfig.containsKey(classPropertyName)) { - // This configuration does not define the config provider via the specified property name - return null; + @SuppressWarnings({"unchecked", "rawtypes"}) + private <U> U newVersionedPlugin( + AbstractConfig config, + String classPropertyName, + String versionPropertyName, + Class basePluginClass, + ClassLoaderUsage classLoaderUsage, + SortedSet<PluginDesc<U>> availablePlugins + ) { + + String version = versionPropertyName == null ? null : config.getString(versionPropertyName); + VersionRange range = null; + if (version != null) { + try { + range = PluginVersionUtils.connectorVersionRequirement(version); + } catch (InvalidVersionSpecificationException e) { + throw new ConnectException(String.format("Invalid version range for %s: %s %s", classPropertyName, version, e)); + } } - Class<? extends ConfigProvider> klass = null; + + assert range == null || classLoaderUsage == ClassLoaderUsage.PLUGINS; + + Class<? extends U> klass = null; + String basePluginClassName = basePluginClass.getSimpleName(); switch (classLoaderUsage) { case CURRENT_CLASSLOADER: // Attempt to load first with the current classloader, and plugins as a fallback. - klass = pluginClassFromConfig(config, classPropertyName, ConfigProvider.class, scanResult.configProviders()); + // Note: we can't use config.getConfiguredInstance because Converter doesn't implement Configurable, and even if it did + // we have to remove the property prefixes before calling config(...) and we still always want to call Converter.config. + klass = pluginClassFromConfig(config, classPropertyName, basePluginClass, availablePlugins); break; case PLUGINS: // Attempt to load with the plugin class loader, which uses the current classloader as a fallback - String configProviderClassOrAlias = originalConfig.get(classPropertyName); + String classOrAlias = config.getClass(classPropertyName).getName(); try { - klass = pluginClass(delegatingLoader, configProviderClassOrAlias, ConfigProvider.class); + klass = pluginClass(delegatingLoader, classOrAlias, basePluginClass, range); } catch (ClassNotFoundException e) { throw new ConnectException( - "Failed to find any class that implements ConfigProvider and which name matches " - + configProviderClassOrAlias + ", available ConfigProviders are: " - + pluginNames(scanResult.configProviders()) + "Failed to find any class that implements " + basePluginClassName + " and which name matches " + + classOrAlias + ", available plugins are: " + + pluginNames(availablePlugins) ); } break; } if (klass == null) { - throw new ConnectException("Unable to initialize the ConfigProvider specified in '" + classPropertyName + "'"); + throw new ConnectException("Unable to initialize the '" + basePluginClassName + + "' specified in '" + classPropertyName + "'"); Review Comment: nit: no single quotes around basePluginClassName. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -427,99 +503,121 @@ public Converter newInternalConverter(boolean isKey, String className, Map<Strin * If the given configuration defines a {@link HeaderConverter} using the named configuration property, return a new configured * instance. * - * @param config the configuration containing the {@link Converter}'s configuration; may not be null - * @param classPropertyName the name of the property that contains the name of the {@link Converter} class; may not be null - * @param classLoaderUsage which classloader should be used + * @param config the configuration containing the {@link HeaderConverter}'s configuration; may not be null + * @param classPropertyName the name of the property that contains the name of the {@link HeaderConverter} class; may not be null + * @param classLoaderUsage the name of the property that contains the version of the {@link HeaderConverter} class; may not be null * @return the instantiated and configured {@link HeaderConverter}; null if the configuration did not define the specified property * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found */ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { - Class<? extends HeaderConverter> klass = null; - switch (classLoaderUsage) { - case CURRENT_CLASSLOADER: - if (!config.originals().containsKey(classPropertyName)) { - // This connector configuration does not define the header converter via the specified property name - return null; - } - // Attempt to load first with the current classloader, and plugins as a fallback. - // Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes - // before calling config(...) - klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, scanResult.headerConverters()); - break; - case PLUGINS: - // Attempt to load with the plugin class loader, which uses the current classloader as a fallback. - // Note that there will always be at least a default header converter for the worker - String converterClassOrAlias = config.getClass(classPropertyName).getName(); - try { - klass = pluginClass( - delegatingLoader, - converterClassOrAlias, - HeaderConverter.class - ); - } catch (ClassNotFoundException e) { - throw new ConnectException( - "Failed to find any class that implements HeaderConverter and which name matches " - + converterClassOrAlias - + ", available header converters are: " - + pluginNames(scanResult.headerConverters()) - ); - } - } - if (klass == null) { - throw new ConnectException("Unable to initialize the HeaderConverter specified in '" + classPropertyName + "'"); + return newHeaderConverter(config, classPropertyName, null, classLoaderUsage); + } + + /** + * If the given configuration defines a {@link HeaderConverter} using the named configuration property, return a new configured + * instance. If the version is specified, it will always use the plugins classloader. + * + * @param config the configuration containing the {@link HeaderConverter}'s configuration; may not be null + * @param classPropertyName the name of the property that contains the name of the {@link HeaderConverter} class; may not be null + * @param versionPropertyName the config for the version for the header converter + * @return the instantiated and configured {@link HeaderConverter}; null if the configuration did not define the specified property + * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found + */ + public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName) { + ClassLoaderUsage classLoader = config.getString(versionPropertyName) == null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS; + return newHeaderConverter(config, classPropertyName, versionPropertyName, classLoader); + } + + private HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) { + if (!config.originals().containsKey(classPropertyName) && classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { + // This configuration does not define the Header Converter via the specified property name + return null; } + HeaderConverter plugin = newVersionedPlugin(config, classPropertyName, versionPropertyName, + HeaderConverter.class, classLoaderUsage, scanResult.headerConverters()); String configPrefix = classPropertyName + "."; Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()); log.debug("Configuring the header converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet()); - HeaderConverter plugin; - try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) { - plugin = newPlugin(klass); + try (LoaderSwap loaderSwap = withClassLoader(plugin.getClass().getClassLoader())) { plugin.configure(converterConfig); } return plugin; } - public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) { - String classPropertyName = providerPrefix + ".class"; - Map<String, String> originalConfig = config.originalsStrings(); - if (!originalConfig.containsKey(classPropertyName)) { - // This configuration does not define the config provider via the specified property name - return null; + @SuppressWarnings({"unchecked", "rawtypes"}) + private <U> U newVersionedPlugin( + AbstractConfig config, + String classPropertyName, + String versionPropertyName, + Class basePluginClass, + ClassLoaderUsage classLoaderUsage, + SortedSet<PluginDesc<U>> availablePlugins + ) { + + String version = versionPropertyName == null ? null : config.getString(versionPropertyName); + VersionRange range = null; + if (version != null) { + try { + range = PluginVersionUtils.connectorVersionRequirement(version); + } catch (InvalidVersionSpecificationException e) { + throw new ConnectException(String.format("Invalid version range for %s: %s %s", classPropertyName, version, e)); + } } - Class<? extends ConfigProvider> klass = null; + + assert range == null || classLoaderUsage == ClassLoaderUsage.PLUGINS; + + Class<? extends U> klass = null; + String basePluginClassName = basePluginClass.getSimpleName(); switch (classLoaderUsage) { case CURRENT_CLASSLOADER: // Attempt to load first with the current classloader, and plugins as a fallback. - klass = pluginClassFromConfig(config, classPropertyName, ConfigProvider.class, scanResult.configProviders()); + // Note: we can't use config.getConfiguredInstance because Converter doesn't implement Configurable, and even if it did + // we have to remove the property prefixes before calling config(...) and we still always want to call Converter.config. Review Comment: nit: stale comment ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -69,36 +75,117 @@ public DelegatingClassLoader() { /** * Retrieve the PluginClassLoader associated with a plugin class + * * @param name The fully qualified class name of the plugin * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. */ // VisibleForTesting - PluginClassLoader pluginClassLoader(String name) { + PluginClassLoader pluginClassLoader(String name, VersionRange range) { if (!PluginUtils.shouldLoadInIsolation(name)) { return null; } + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); if (inner == null) { return null; } - ClassLoader pluginLoader = inner.get(inner.lastKey()); + + + ClassLoader pluginLoader = findPluginLoader(inner, name, range); return pluginLoader instanceof PluginClassLoader - ? (PluginClassLoader) pluginLoader - : null; + ? (PluginClassLoader) pluginLoader + : null; } - ClassLoader connectorLoader(String connectorClassOrAlias) { - String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); - ClassLoader classLoader = pluginClassLoader(fullName); - if (classLoader == null) classLoader = this; + PluginClassLoader pluginClassLoader(String name) { + return pluginClassLoader(name, null); + } + + ClassLoader loader(String classOrAlias, VersionRange range) throws ClassNotFoundException { + String fullName = aliases.getOrDefault(classOrAlias, classOrAlias); + ClassLoader classLoader = range == null ? pluginClassLoader(fullName) : + // load the plugin class when version is provided which will validate the correct version is loaded + loadVersionedPluginClass(fullName, range, false).getClassLoader(); Review Comment: This logic is incorrect, and I think you can just call `pluginClassLoader(fullName, range)` to let the null check internally handle both cases. This will also remove the ClassNotFoundException and the need to catch it higher up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org