snehashisp commented on code in PR #17741:
URL: https://github.com/apache/kafka/pull/17741#discussion_r1876087461


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -405,13 +471,102 @@ protected ConfigDef config(Predicate<?> predicate) {
         return newDef;
     }
 
+    private static <T> ConverterDefaults converterDefaults(
+            Plugins plugins,
+            String connectorConverterConfig,
+            String workerConverterConfig,
+            String workerConverterVersionConfig,
+            Map<String, String> connectorProps,
+            Map<String, String> workerProps,
+            Class<T> converterType
+    ) {
+        /*
+        if a converter is specified in the connector config it overrides the 
worker config for the corresponding converter
+        otherwise the worker config is used, hence if the converter is not 
provided in the connector config, the default
+        is the one provided in the worker config
+
+        for converters which version is used depends on a several factors with 
multi-versioning support
+        A. If the converter class is provided as part of the connector 
properties
+            1. if the version is not provided,
+                - if the converter is packaged with the connector then, the 
packaged version is used
+                - if the converter is not packaged with the connector, the 
latest version is used
+            2. if the version is provided, the provided version is used
+        B. If the converter class is not provided as part of the connector 
properties, but provided as part of the worker properties
+            1. if the version is not provided, the latest version is used
+            2. if the version is provided, the provided version is used
+        C. If the converter class is not provided as part of the connector 
properties and not provided as part of the worker properties,
+        the converter to use is unknown hence no default version can be 
determined (null)
+
+        Note: Connect when using service loading has an issue outlined in 
KAFKA-18119. The issue means that the above
+        logic does not hold currently for clusters using service loading when 
converters are defined in the connector.
+        However, the logic to determine the default should ideally follow the 
one outlined above, and the code here
+        should still show the correct default version regardless of the bug.
+        */
+        final String connectorConverter = 
connectorProps.get(connectorConverterConfig);
+        final String workerConverter = workerProps.get(workerConverterConfig);
+        final String connectorClass = 
connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        final String connectorVersion = 
connectorProps.get(ConnectorConfig.CONNECTOR_VERSION);
+        if (connectorClass == null || (connectorConverter == null && 
workerConverter == null)) {
+            return new ConverterDefaults();
+        }
+
+        ConverterDefaults defaults = new ConverterDefaults();
+        // update the default of connector converter based on if the worker 
converter is provided
+        defaults.type = workerConverter;
+
+
+        String version = null;
+        if (connectorConverter != null) {
+            version = fetchPluginVersion(plugins, connectorClass, 
connectorVersion, connectorConverter, converterType);
+        } else {
+            version = workerProps.get(workerConverterVersionConfig);
+            if (version == null) {
+                version = plugins.latestVersion(workerConverter);
+            }
+        }
+        defaults.version = version;
+        return defaults;
+    }
+
+    private static void updateKeyDefault(ConfigDef configDef, String 
versionConfigKey, String versionDefault) {
+        ConfigDef.ConfigKey key = configDef.configKeys().get(versionConfigKey);
+        if (key == null) {
+            return;
+        }
+        configDef.configKeys().put(versionConfigKey, new ConfigDef.ConfigKey(
+                versionConfigKey, key.type, versionDefault, key.validator, 
key.importance, key.documentation, key.group, key.orderInGroup, key.width, 
key.displayName, key.dependents, key.recommender, false
+        ));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> String fetchPluginVersion(Plugins plugins, String 
connectorClass, String connectorVersion, String pluginName, Class<T> 
pluginClass) {
+        if (pluginName == null) {
+            return null;
+        }
+        try {
+            VersionRange range = 
PluginUtils.connectorVersionRequirement(connectorVersion);
+            ClassLoader connectorLoader = plugins.pluginLoader(connectorClass, 
range);
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                T plugin = (T) plugins.newPlugin(pluginName, pluginClass, 
null);
+                if (plugin instanceof Versioned) {
+                    return ((Versioned) plugin).version();
+                }
+            }

Review Comment:
   With some more thought, I think `plugin.latestVersion` can be incorrect 
under some edge cases. If two different version of the same plugin is bundled 
under the same plugin classloader, latest version will return the higher 
version. However, the actual classloading will use whichever one it finds 
earlier.  Although this is not the recommended way to install plugins, we do 
not check for this and don't fail during plugin scanning. I feel that the only 
real way of getting the version is to actually instantiate the plugin and get 
its version. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to