snehashisp commented on code in PR #17741: URL: https://github.com/apache/kafka/pull/17741#discussion_r1876087461
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java: ########## @@ -405,13 +471,102 @@ protected ConfigDef config(Predicate<?> predicate) { return newDef; } + private static <T> ConverterDefaults converterDefaults( + Plugins plugins, + String connectorConverterConfig, + String workerConverterConfig, + String workerConverterVersionConfig, + Map<String, String> connectorProps, + Map<String, String> workerProps, + Class<T> converterType + ) { + /* + if a converter is specified in the connector config it overrides the worker config for the corresponding converter + otherwise the worker config is used, hence if the converter is not provided in the connector config, the default + is the one provided in the worker config + + for converters which version is used depends on a several factors with multi-versioning support + A. If the converter class is provided as part of the connector properties + 1. if the version is not provided, + - if the converter is packaged with the connector then, the packaged version is used + - if the converter is not packaged with the connector, the latest version is used + 2. if the version is provided, the provided version is used + B. If the converter class is not provided as part of the connector properties, but provided as part of the worker properties + 1. if the version is not provided, the latest version is used + 2. if the version is provided, the provided version is used + C. If the converter class is not provided as part of the connector properties and not provided as part of the worker properties, + the converter to use is unknown hence no default version can be determined (null) + + Note: Connect when using service loading has an issue outlined in KAFKA-18119. The issue means that the above + logic does not hold currently for clusters using service loading when converters are defined in the connector. + However, the logic to determine the default should ideally follow the one outlined above, and the code here + should still show the correct default version regardless of the bug. + */ + final String connectorConverter = connectorProps.get(connectorConverterConfig); + final String workerConverter = workerProps.get(workerConverterConfig); + final String connectorClass = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + final String connectorVersion = connectorProps.get(ConnectorConfig.CONNECTOR_VERSION); + if (connectorClass == null || (connectorConverter == null && workerConverter == null)) { + return new ConverterDefaults(); + } + + ConverterDefaults defaults = new ConverterDefaults(); + // update the default of connector converter based on if the worker converter is provided + defaults.type = workerConverter; + + + String version = null; + if (connectorConverter != null) { + version = fetchPluginVersion(plugins, connectorClass, connectorVersion, connectorConverter, converterType); + } else { + version = workerProps.get(workerConverterVersionConfig); + if (version == null) { + version = plugins.latestVersion(workerConverter); + } + } + defaults.version = version; + return defaults; + } + + private static void updateKeyDefault(ConfigDef configDef, String versionConfigKey, String versionDefault) { + ConfigDef.ConfigKey key = configDef.configKeys().get(versionConfigKey); + if (key == null) { + return; + } + configDef.configKeys().put(versionConfigKey, new ConfigDef.ConfigKey( + versionConfigKey, key.type, versionDefault, key.validator, key.importance, key.documentation, key.group, key.orderInGroup, key.width, key.displayName, key.dependents, key.recommender, false + )); + } + + @SuppressWarnings("unchecked") + private static <T> String fetchPluginVersion(Plugins plugins, String connectorClass, String connectorVersion, String pluginName, Class<T> pluginClass) { + if (pluginName == null) { + return null; + } + try { + VersionRange range = PluginUtils.connectorVersionRequirement(connectorVersion); + ClassLoader connectorLoader = plugins.pluginLoader(connectorClass, range); + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + T plugin = (T) plugins.newPlugin(pluginName, pluginClass, null); + if (plugin instanceof Versioned) { + return ((Versioned) plugin).version(); + } + } Review Comment: I think `plugin.latestVersion` can be incorrect under some edge cases. If two different version of the same plugin is bundled under the same plugin classloader, latest version will return the higher version. However, the actual classloading will use whichever one it finds earlier. Although this is not the recommended way to install plugins, we do not check for this and don't fail during plugin scanning. I feel that the only real way of getting the version is to actually instantiate the plugin and get its version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org