snehashisp commented on code in PR #16984:
URL: https://github.com/apache/kafka/pull/16984#discussion_r1870737937


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -434,50 +516,93 @@ public Converter newInternalConverter(boolean isKey, 
String className, Map<Strin
      * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
-        Class<? extends HeaderConverter> klass = null;
+        return getHeaderConverter(config, classPropertyName, null, 
classLoaderUsage);
+    }
+
+    public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;
+        return getHeaderConverter(config, classPropertyName, 
versionPropertyName, classLoader);
+    }
+
+    private HeaderConverter getHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName)) {
+            // This configuration does not define the Header Converter via the 
specified property name
+            return null;
+        }
+
+        HeaderConverter plugin = getVersionedPlugin(config, classPropertyName, 
versionPropertyName,
+                HeaderConverter.class, classLoaderUsage, 
scanResult.headerConverters());
+
+        String configPrefix = classPropertyName + ".";
+        Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
+        converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
+        log.debug("Configuring the header converter with configuration 
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
+
+        try (LoaderSwap loaderSwap = 
withClassLoader(plugin.getClass().getClassLoader())) {
+            plugin.configure(converterConfig);
+        }
+        return plugin;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <U> U getVersionedPlugin(
+            AbstractConfig config,
+            String classPropertyName,
+            String versionPropertyName,
+            Class basePluginClass,
+            ClassLoaderUsage classLoaderUsage,
+            SortedSet<PluginDesc<U>> availablePlugins
+    ) {
+
+        String version = versionPropertyName == null ? null : 
config.getString(versionPropertyName);
+        VersionRange range = null;
+        if (version != null) {
+            try {
+                range = VersionRange.createFromVersionSpec(version);
+            } catch (InvalidVersionSpecificationException e) {
+                throw new ConnectException(String.format("Invalid version 
range for %s: %s %s", classPropertyName, version, e));
+            }
+        }
+
+        Class<? extends U> klass = null;
+        String basePluginClassName = basePluginClass.getSimpleName();
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
-                if (!config.originals().containsKey(classPropertyName)) {
-                    // This connector configuration does not define the header 
converter via the specified property name
-                    return null;
-                }
                 // Attempt to load first with the current classloader, and 
plugins as a fallback.
-                // Note: we can't use config.getConfiguredInstance because we 
have to remove the property prefixes
-                // before calling config(...)
-                klass = pluginClassFromConfig(config, classPropertyName, 
HeaderConverter.class, scanResult.headerConverters());
+                // Note: we can't use config.getConfiguredInstance because 
Converter doesn't implement Configurable, and even if it did
+                // we have to remove the property prefixes before calling 
config(...) and we still always want to call Converter.config.
+                klass = pluginClassFromConfig(config, classPropertyName, 
basePluginClass, availablePlugins);
                 break;
             case PLUGINS:
-                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback.
-                // Note that there will always be at least a default header 
converter for the worker
-                String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
+                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
+                String classOrAlias = 
config.getClass(classPropertyName).getName();
                 try {
-                    klass = pluginClass(
-                            delegatingLoader,
-                            converterClassOrAlias,
-                            HeaderConverter.class
-                    );
+                    klass = pluginClass(delegatingLoader, classOrAlias, 
basePluginClass, range);
                 } catch (ClassNotFoundException e) {
                     throw new ConnectException(
-                            "Failed to find any class that implements 
HeaderConverter and which name matches "
-                                    + converterClassOrAlias
-                                    + ", available header converters are: "
-                                    + 
pluginNames(scanResult.headerConverters())
+                            "Failed to find any class that implements " + 
basePluginClassName + " and which name matches "
+                                    + classOrAlias + ", available plugins are: 
"
+                                    + pluginNames(availablePlugins)
                     );
                 }
+                break;
         }
         if (klass == null) {
-            throw new ConnectException("Unable to initialize the 
HeaderConverter specified in '" + classPropertyName + "'");
+            throw new ConnectException("Unable to initialize the '" + 
basePluginClassName
+                    + "' specified in '" + classPropertyName + "'");
         }
 
-        String configPrefix = classPropertyName + ".";
-        Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
-        converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
-        log.debug("Configuring the header converter with configuration 
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
-
-        HeaderConverter plugin;
+        U plugin;
         try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
             plugin = newPlugin(klass);
-            plugin.configure(converterConfig);
+            DefaultArtifactVersion pluginVersion = new 
DefaultArtifactVersion(PluginScanner.versionFor(plugin));
+            if (range != null && range.hasRestrictions() && 
!range.containsVersion(pluginVersion)) {
+                // this can happen if the current class loader is used
+                // if there are version restrictions then this should be 
captured, and we should load using the plugin class loader
+                if (classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+                    return getVersionedPlugin(config, classPropertyName, 
versionPropertyName, basePluginClass, ClassLoaderUsage.PLUGINS, 
availablePlugins);
+                }
+            }

Review Comment:
   Yes, the situation you mentioned is exactly the reason why the public 
methods to create a converter with a version provided we don't have the option 
to pass in a class loader and defaults to using the plugins loader, otherwise 
it can potentially shadow a higher version of a plugin requirement. This code 
path should ideally never be executed. Will remove this and add the assertion 
suggestion. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to