snehashisp commented on code in PR #16984: URL: https://github.com/apache/kafka/pull/16984#discussion_r1870737937
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -434,50 +516,93 @@ public Converter newInternalConverter(boolean isKey, String className, Map<Strin * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found */ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) { - Class<? extends HeaderConverter> klass = null; + return getHeaderConverter(config, classPropertyName, null, classLoaderUsage); + } + + public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName) { + ClassLoaderUsage classLoader = config.getString(versionPropertyName) == null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS; + return getHeaderConverter(config, classPropertyName, versionPropertyName, classLoader); + } + + private HeaderConverter getHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) { + if (!config.originals().containsKey(classPropertyName)) { + // This configuration does not define the Header Converter via the specified property name + return null; + } + + HeaderConverter plugin = getVersionedPlugin(config, classPropertyName, versionPropertyName, + HeaderConverter.class, classLoaderUsage, scanResult.headerConverters()); + + String configPrefix = classPropertyName + "."; + Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); + converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()); + log.debug("Configuring the header converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet()); + + try (LoaderSwap loaderSwap = withClassLoader(plugin.getClass().getClassLoader())) { + plugin.configure(converterConfig); + } + return plugin; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private <U> U getVersionedPlugin( + AbstractConfig config, + String classPropertyName, + String versionPropertyName, + Class basePluginClass, + ClassLoaderUsage classLoaderUsage, + SortedSet<PluginDesc<U>> availablePlugins + ) { + + String version = versionPropertyName == null ? null : config.getString(versionPropertyName); + VersionRange range = null; + if (version != null) { + try { + range = VersionRange.createFromVersionSpec(version); + } catch (InvalidVersionSpecificationException e) { + throw new ConnectException(String.format("Invalid version range for %s: %s %s", classPropertyName, version, e)); + } + } + + Class<? extends U> klass = null; + String basePluginClassName = basePluginClass.getSimpleName(); switch (classLoaderUsage) { case CURRENT_CLASSLOADER: - if (!config.originals().containsKey(classPropertyName)) { - // This connector configuration does not define the header converter via the specified property name - return null; - } // Attempt to load first with the current classloader, and plugins as a fallback. - // Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes - // before calling config(...) - klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, scanResult.headerConverters()); + // Note: we can't use config.getConfiguredInstance because Converter doesn't implement Configurable, and even if it did + // we have to remove the property prefixes before calling config(...) and we still always want to call Converter.config. + klass = pluginClassFromConfig(config, classPropertyName, basePluginClass, availablePlugins); break; case PLUGINS: - // Attempt to load with the plugin class loader, which uses the current classloader as a fallback. - // Note that there will always be at least a default header converter for the worker - String converterClassOrAlias = config.getClass(classPropertyName).getName(); + // Attempt to load with the plugin class loader, which uses the current classloader as a fallback + String classOrAlias = config.getClass(classPropertyName).getName(); try { - klass = pluginClass( - delegatingLoader, - converterClassOrAlias, - HeaderConverter.class - ); + klass = pluginClass(delegatingLoader, classOrAlias, basePluginClass, range); } catch (ClassNotFoundException e) { throw new ConnectException( - "Failed to find any class that implements HeaderConverter and which name matches " - + converterClassOrAlias - + ", available header converters are: " - + pluginNames(scanResult.headerConverters()) + "Failed to find any class that implements " + basePluginClassName + " and which name matches " + + classOrAlias + ", available plugins are: " + + pluginNames(availablePlugins) ); } + break; } if (klass == null) { - throw new ConnectException("Unable to initialize the HeaderConverter specified in '" + classPropertyName + "'"); + throw new ConnectException("Unable to initialize the '" + basePluginClassName + + "' specified in '" + classPropertyName + "'"); } - String configPrefix = classPropertyName + "."; - Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); - converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()); - log.debug("Configuring the header converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet()); - - HeaderConverter plugin; + U plugin; try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) { plugin = newPlugin(klass); - plugin.configure(converterConfig); + DefaultArtifactVersion pluginVersion = new DefaultArtifactVersion(PluginScanner.versionFor(plugin)); + if (range != null && range.hasRestrictions() && !range.containsVersion(pluginVersion)) { + // this can happen if the current class loader is used + // if there are version restrictions then this should be captured, and we should load using the plugin class loader + if (classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { + return getVersionedPlugin(config, classPropertyName, versionPropertyName, basePluginClass, ClassLoaderUsage.PLUGINS, availablePlugins); + } + } Review Comment: Yes, the situation you mentioned is exactly the reason why the public methods to create a converter with a version provided we don't have the option to pass in a class loader and defaults to using the plugins loader, otherwise it can potentially shadow a higher version of a plugin requirement. This code path should ideally never be executed. Will remove this and add the assertion suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org