gharris1727 commented on code in PR #17741:
URL: https://github.com/apache/kafka/pull/17741#discussion_r1876326896


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java:
##########
@@ -25,7 +25,7 @@
 
 public class SampleSinkConnector extends SinkConnector {
 
-    public static final String VERSION = "some great version";
+    public static final String VERSION = "latest";

Review Comment:
   Plugins shouldn't return "latest" as their version, this is nonsense.
   
   Maybe something that we should guard against.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -237,6 +237,8 @@ private void verifyClasspathVersionedPlugin(String name, 
Class<?> plugin, Versio
                     name
             ));
         } else if (classpathPlugins.isEmpty()) {
+

Review Comment:
   nit:  extra newlines



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -357,6 +357,10 @@ private <T> Set<PluginDesc<T>> pluginsOfClass(String 
classNameOrAlias, Set<Plugi
         return plugins;
     }
 
+    public PluginsRecommenders recommender() {

Review Comment:
   > If we keep this as part of Plugins, then we can avoid the new 
PluginsRecommendors(plugins) call on every validate. Since the plugins 
information, once scanned remains static we don't really need to initialize all 
the recommenders again and again.
   
   This is premature optimization, please don't do it. When someone has a 
benchmark that shows the benefit of this, it is trivial to implement.
   
   The constructor of PluginsRecommenders does not perform any substantial 
pre-computation.
   Validation already constructs hundreds of objects (ConfigValues, etc) each 
time it is called.
   
   > Also, if the plugins can be improved to be mutable in future it would be 
easier to revoke the current instance of recommenders in plugins and generate 
the new recommender, and have it reflected across all the uses.
   
   YAGNI. When/if we get around to mutable Plugins, this class will look very 
different.
   



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -405,13 +471,102 @@ protected ConfigDef config(Predicate<?> predicate) {
         return newDef;
     }
 
+    private static <T> ConverterDefaults converterDefaults(
+            Plugins plugins,
+            String connectorConverterConfig,
+            String workerConverterConfig,
+            String workerConverterVersionConfig,
+            Map<String, String> connectorProps,
+            Map<String, String> workerProps,
+            Class<T> converterType
+    ) {
+        /*
+        if a converter is specified in the connector config it overrides the 
worker config for the corresponding converter
+        otherwise the worker config is used, hence if the converter is not 
provided in the connector config, the default
+        is the one provided in the worker config
+
+        for converters which version is used depends on a several factors with 
multi-versioning support
+        A. If the converter class is provided as part of the connector 
properties
+            1. if the version is not provided,
+                - if the converter is packaged with the connector then, the 
packaged version is used
+                - if the converter is not packaged with the connector, the 
latest version is used
+            2. if the version is provided, the provided version is used
+        B. If the converter class is not provided as part of the connector 
properties, but provided as part of the worker properties
+            1. if the version is not provided, the latest version is used
+            2. if the version is provided, the provided version is used
+        C. If the converter class is not provided as part of the connector 
properties and not provided as part of the worker properties,
+        the converter to use is unknown hence no default version can be 
determined (null)
+
+        Note: Connect when using service loading has an issue outlined in 
KAFKA-18119. The issue means that the above
+        logic does not hold currently for clusters using service loading when 
converters are defined in the connector.
+        However, the logic to determine the default should ideally follow the 
one outlined above, and the code here
+        should still show the correct default version regardless of the bug.
+        */
+        final String connectorConverter = 
connectorProps.get(connectorConverterConfig);
+        final String workerConverter = workerProps.get(workerConverterConfig);
+        final String connectorClass = 
connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        final String connectorVersion = 
connectorProps.get(ConnectorConfig.CONNECTOR_VERSION);
+        if (connectorClass == null || (connectorConverter == null && 
workerConverter == null)) {
+            return new ConverterDefaults();
+        }
+
+        ConverterDefaults defaults = new ConverterDefaults();
+        // update the default of connector converter based on if the worker 
converter is provided
+        defaults.type = workerConverter;
+
+
+        String version = null;
+        if (connectorConverter != null) {
+            version = fetchPluginVersion(plugins, connectorClass, 
connectorVersion, connectorConverter, converterType);
+        } else {
+            version = workerProps.get(workerConverterVersionConfig);
+            if (version == null) {
+                version = plugins.latestVersion(workerConverter);
+            }
+        }
+        defaults.version = version;
+        return defaults;
+    }
+
+    private static void updateKeyDefault(ConfigDef configDef, String 
versionConfigKey, String versionDefault) {
+        ConfigDef.ConfigKey key = configDef.configKeys().get(versionConfigKey);
+        if (key == null) {
+            return;
+        }
+        configDef.configKeys().put(versionConfigKey, new ConfigDef.ConfigKey(
+                versionConfigKey, key.type, versionDefault, key.validator, 
key.importance, key.documentation, key.group, key.orderInGroup, key.width, 
key.displayName, key.dependents, key.recommender, false
+        ));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> String fetchPluginVersion(Plugins plugins, String 
connectorClass, String connectorVersion, String pluginName, Class<T> 
pluginClass) {
+        if (pluginName == null) {
+            return null;
+        }
+        try {
+            VersionRange range = 
PluginUtils.connectorVersionRequirement(connectorVersion);
+            ClassLoader connectorLoader = plugins.pluginLoader(connectorClass, 
range);
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                T plugin = (T) plugins.newPlugin(pluginName, pluginClass, 
null);
+                if (plugin instanceof Versioned) {
+                    return ((Versioned) plugin).version();
+                }
+            }

Review Comment:
   > If two different version of the same plugin is bundled under the same 
plugin classloader
   
   This is impossible from Connect's perspective. Once a ClassLoader returns a 
Class, it is cached and cannot be redefined. If two versions of a plugin are 
both present within the same ClassLoader, one will be picked arbitrarily, and 
it will always be picked (on the local machine). Effectively, Connect cant even 
know the second class file is there, which is why we don't try and detect it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to