snehashisp commented on code in PR #16984:
URL: https://github.com/apache/kafka/pull/16984#discussion_r1870818248


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -434,50 +516,93 @@ public Converter newInternalConverter(boolean isKey, 
String className, Map<Strin
      * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
-        Class<? extends HeaderConverter> klass = null;
+        return getHeaderConverter(config, classPropertyName, null, 
classLoaderUsage);
+    }
+
+    public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;
+        return getHeaderConverter(config, classPropertyName, 
versionPropertyName, classLoader);
+    }
+
+    private HeaderConverter getHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName)) {
+            // This configuration does not define the Header Converter via the 
specified property name
+            return null;
+        }
+
+        HeaderConverter plugin = getVersionedPlugin(config, classPropertyName, 
versionPropertyName,
+                HeaderConverter.class, classLoaderUsage, 
scanResult.headerConverters());
+
+        String configPrefix = classPropertyName + ".";
+        Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
+        converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
+        log.debug("Configuring the header converter with configuration 
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
+
+        try (LoaderSwap loaderSwap = 
withClassLoader(plugin.getClass().getClassLoader())) {
+            plugin.configure(converterConfig);
+        }
+        return plugin;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <U> U getVersionedPlugin(
+            AbstractConfig config,
+            String classPropertyName,
+            String versionPropertyName,
+            Class basePluginClass,
+            ClassLoaderUsage classLoaderUsage,
+            SortedSet<PluginDesc<U>> availablePlugins
+    ) {
+
+        String version = versionPropertyName == null ? null : 
config.getString(versionPropertyName);
+        VersionRange range = null;
+        if (version != null) {
+            try {
+                range = VersionRange.createFromVersionSpec(version);
+            } catch (InvalidVersionSpecificationException e) {
+                throw new ConnectException(String.format("Invalid version 
range for %s: %s %s", classPropertyName, version, e));
+            }
+        }
+
+        Class<? extends U> klass = null;
+        String basePluginClassName = basePluginClass.getSimpleName();
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
-                if (!config.originals().containsKey(classPropertyName)) {
-                    // This connector configuration does not define the header 
converter via the specified property name
-                    return null;
-                }

Review Comment:
   Good point, I think if the header converter is not defined in both connector 
config and worker config it will not be initialized.
   
   TBH, this initialization mechanism always seemed a bit brittle. Class loader 
usage seems to be strongly tied to whether we use the connector config or the 
worker config. Although we don't do it, there is nothing stopping us from 
passing a WorkerConfig with current class loader or vice versa. Also, 
CURRENT_CLASSLOADER seems to indicate that we are creating the plugin with the 
loader of the current thread. This is not exactly the case as its actually the 
loader in use when the AbstractConfig was initialized (which internally does 
Utils.loadclass for class type configs). This can be addressed separately but 
for now I have preserved the existing behavior to reduce potential edge cases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to