gharris1727 commented on code in PR #17741: URL: https://github.com/apache/kafka/pull/17741#discussion_r1868543459
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -661,25 +682,35 @@ ConfigInfos validateConnectorConfig( } } String connType = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + VersionRange connVersion = null; + try { + connVersion = PluginVersionUtils.connectorVersionRequirement(connectorProps.get(CONNECTOR_VERSION)); + } catch (Exception e) { + throw new BadRequestException(e.getMessage()); + } + if (connType == null) throw new BadRequestException("Connector config " + connectorProps + " contains no connector type"); - Connector connector = getConnector(connType); - ClassLoader connectorLoader = plugins().connectorLoader(connType); + Connector connector = getConnector(connType, connVersion); + ClassLoader connectorLoader = connector.getClass().getClassLoader(); Review Comment: This gets the incorrect loader, see https://github.com/apache/kafka/pull/16984#discussion_r1868461401 ########## connect/runtime/src/main/java/org/apache/kafka/connect/util/PluginVersionUtils.java: ########## @@ -0,0 +1,186 @@ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; +import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException; +import org.apache.maven.artifact.versioning.VersionRange; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class PluginVersionUtils { + + private static Plugins plugins = null; Review Comment: You need to find a way to make this non-static; There are routinely multiple Plugins instances in use within the same JVM, which would immediately collide on this field. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java: ########## @@ -199,33 +234,37 @@ public Object get(String key) { } } + public static ConfigDef BASE_CONFIGS = new ConfigDef() Review Comment: Don't make this a static field, it leaves opportunities for it to get mutated accidentally. ########## connect/runtime/src/main/java/org/apache/kafka/connect/util/PluginVersionUtils.java: ########## @@ -0,0 +1,186 @@ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; +import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException; +import org.apache.maven.artifact.versioning.VersionRange; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class PluginVersionUtils { + + private static Plugins plugins = null; + + public static void setPlugins(Plugins plugins) { + PluginVersionUtils.plugins = plugins; + } + + public static VersionRange connectorVersionRequirement(String version) throws InvalidVersionSpecificationException { + if (version == null || version.equals("latest")) { + return null; + } + version = version.trim(); + + // check first if the given version is valid + VersionRange.createFromVersionSpec(version); + + // now if the version is not enclosed we consider it as a hard requirement and enclose it in [] + if (!version.startsWith("[") && !version.startsWith("(")) { + version = "[" + version + "]"; + } Review Comment: I think a more robust check for this would be to check if the recommendedVersion is non-null. (or i think equivalently, hasRestrictions) ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java: ########## @@ -411,13 +449,149 @@ protected ConfigDef config(Predicate<?> predicate) { return newDef; } + public static void updateDefaults(ConfigDef configDef, Plugins plugins, Map<String, String> connProps, Map<String, String> workerProps) { + updateAllConverterDefaults(configDef, plugins, connProps, workerProps); + updateConnectorVersionDefaults(configDef, plugins, connProps.get(CONNECTOR_CLASS_CONFIG)); + } + + public static void updateConnectorVersionDefaults(ConfigDef configDef, Plugins plugins, String connectorClass) { + // if provided connector version is null, the latest version is used + updateKeyDefault(configDef, ConnectorConfig.CONNECTOR_VERSION, plugins.latestVersion(connectorClass)); + } + + public static void updateAllConverterDefaults(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateKeyConverterDefault(configDef, plugins, connProps, workerProps); + updateValueConverterDefault(configDef, plugins, connProps, workerProps); + updateHeaderConverterDefault(configDef, plugins, connProps, workerProps); + } + + public static void updateKeyConverterDefault(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateConverterDefaults( + configDef, plugins, + KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, + KEY_CONVERTER_VERSION_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerProps + ); + } + + public static void updateValueConverterDefault(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateConverterDefaults( + configDef, plugins, + VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, + VALUE_CONVERTER_VERSION_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerProps + ); + } + + public static void updateHeaderConverterDefault(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateConverterDefaults( + configDef, plugins, + HEADER_CONVERTER_CLASS_CONFIG, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, + HEADER_CONVERTER_VERSION_CONFIG, WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerProps + ); + } + + private static void updateConverterDefaults( Review Comment: I think this "updating defaults" pattern is messy. I would much rather the ConnectorConfig ConfigDef depend on Plugins (like EnrichedConnectorConfig) and either define the no-default or with-default variants separately. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -462,11 +459,26 @@ private <T> ConfigInfos validateConverterConfig( T pluginInstance; String stageDescription = "instantiating the connector's " + pluginName + " for validation"; try (TemporaryStage stage = reportStage.apply(stageDescription)) { - pluginInstance = Utils.newInstance(pluginClass, pluginInterface); + VersionRange range = PluginVersionUtils.connectorVersionRequirement(pluginVersion); + // utils.newInstance is done when no version is provided to preserve older behaviour prior to multi-versioning support + // utils.newInstance will try and load the class from the current classloader (which is the connector classloader) before + // delegating plugin loading mechanism of the worker, while plugins().newPlugin will always use the worker's plugin loading mechanism + // in cases where the converter is packaged with the connector package and no version is provided , utils.newInstance + // will choose the converter from the connector, while plugins().newPlugin will choose the latest converter version found while plugin loading. + pluginInstance = range == null ? Utils.newInstance(pluginClass, pluginInterface): (T) plugins().newPlugin(pluginClass, range); Review Comment: I think this detail is inappropriate in the AbstractHerder, and the classloader choosing logic and etc is more appropriate to handle in the Plugins class. You can make range == null request the CURRENT_CLASSLOADER for legacy compatibility. Also i've been trying to eliminate Utils.newInstance in Connect for a while, so don't be afraid of removing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org