gharris1727 commented on code in PR #17741:
URL: https://github.com/apache/kafka/pull/17741#discussion_r1874983220


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import 
org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
+import org.apache.maven.artifact.versioning.VersionRange;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CachedConnectors {
+
+    private final static String LATEST_VERSION = "latest";
+
+    private final Map<String, Map<String, Connector>> connectors;
+    private final Map<String, Exception> invalidConnectors;
+    private final Map<String, Map<String, Exception>> invalidVersions;
+    private final Plugins plugins;
+
+    public CachedConnectors(Plugins plugins) {
+        this.plugins = plugins;
+        this.connectors = new ConcurrentHashMap<>();
+        this.invalidConnectors = new ConcurrentHashMap<>();
+        this.invalidVersions = new ConcurrentHashMap<>();
+    }
+
+    private void validate(String connectorName, VersionRange range) throws 
Exception {
+        if (invalidConnectors.containsKey(connectorName)) {
+            throw invalidConnectors.get(connectorName);

Review Comment:
   Rethrowing the same exception can cause some strange results, because the 
exception's stacktrace gets computed only once.
   
   You can cache the exception from Plugins, and then create and throw a new 
exception on each method call.



##########
.gitignore:
##########
@@ -61,3 +61,6 @@ storage/kafka-tiered-storage/
 docker/test/report_*.html
 kafka.Kafka
 __pycache__
+/connect/runtime/src/main/java/org/apache/kafka/connect/testing/
+/connect/file/
+/connect/json/

Review Comment:
   I think these ignores include real code



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -367,6 +371,13 @@ public Object newPlugin(String classOrAlias, VersionRange 
range) throws Versione
         return newPlugin(klass);
     }
 
+    public <T> Object newPlugin(String classOrAlias, Class<T> baseClass, 
VersionRange range) throws ClassNotFoundException {

Review Comment:
   This is a bad method addition:
   
   * The point of passing in a `Class<T>` is in order to return a `T`, to avoid 
a `(T)` cast in the caller.
   * This method returns `Object` so it still requires a cast in the caller
   * It does a null check on version, which is already very reasonably handled 
within the rest of Plugins.
   * The method calls Utils.newInstance which instantiates the plugin with the 
wrong TCCL
   
   Either change the caller to use `newPlugin(String, VersionRange)` and 
perform the blind cast, or change this method to actually handle the 
casting/type safety and not call Utils.newInstance.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -70,20 +70,30 @@ public class WorkerConfig extends AbstractConfig {
     public static final String CLIENT_DNS_LOOKUP_CONFIG = 
CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
     public static final String CLIENT_DNS_LOOKUP_DOC = 
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;
 
+    public static final String PLUGIN_VERSION_SUFFIX = "version";

Review Comment:
   I checked the KIP, and the suffix should be `plugin.version`.
   
   But the connector is still written to use "version", is that a mistake or 
intentional? I would prefer it being consistent so we can use the same constant 
everywhere.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -98,6 +118,12 @@ public class ConnectorConfig extends AbstractConfig {
             new InstantiableClassValidator()
     );
 
+    public static final String VALUE_CONVERTER_VERSION_CONFIG = 
WorkerConfig.VALUE_CONVERTER_VERSION;
+    private static final String VALUE_CONVERTER_VERSION_DEFAULT = null;
+    private static final String VALUE_CONVERTER_VERSION_DOC = "Version of the 
value converter.";
+    private static final String VALUE_CONVERTER_VERSION_DISPLAY = "Value 
converter version";
+    private static final ConfigDef.Validator VALUE_CONVERTER_VERSION_VALIDATOR 
= new PluginVersionValidator();
+
     public static final String HEADER_CONVERTER_CLASS_CONFIG = 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
     public static final String HEADER_CONVERTER_CLASS_DOC = 
WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
     public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header 
converter class";

Review Comment:
   nit: this old constant HEADER_CONVERTER_CLASS_DEFAULT is unused



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -405,13 +471,102 @@ protected ConfigDef config(Predicate<?> predicate) {
         return newDef;
     }
 
+    private static <T> ConverterDefaults converterDefaults(
+            Plugins plugins,
+            String connectorConverterConfig,
+            String workerConverterConfig,
+            String workerConverterVersionConfig,
+            Map<String, String> connectorProps,
+            Map<String, String> workerProps,
+            Class<T> converterType
+    ) {
+        /*
+        if a converter is specified in the connector config it overrides the 
worker config for the corresponding converter
+        otherwise the worker config is used, hence if the converter is not 
provided in the connector config, the default
+        is the one provided in the worker config
+
+        for converters which version is used depends on a several factors with 
multi-versioning support
+        A. If the converter class is provided as part of the connector 
properties
+            1. if the version is not provided,
+                - if the converter is packaged with the connector then, the 
packaged version is used
+                - if the converter is not packaged with the connector, the 
latest version is used
+            2. if the version is provided, the provided version is used
+        B. If the converter class is not provided as part of the connector 
properties, but provided as part of the worker properties
+            1. if the version is not provided, the latest version is used
+            2. if the version is provided, the provided version is used
+        C. If the converter class is not provided as part of the connector 
properties and not provided as part of the worker properties,
+        the converter to use is unknown hence no default version can be 
determined (null)
+
+        Note: Connect when using service loading has an issue outlined in 
KAFKA-18119. The issue means that the above
+        logic does not hold currently for clusters using service loading when 
converters are defined in the connector.
+        However, the logic to determine the default should ideally follow the 
one outlined above, and the code here
+        should still show the correct default version regardless of the bug.
+        */
+        final String connectorConverter = 
connectorProps.get(connectorConverterConfig);
+        final String workerConverter = workerProps.get(workerConverterConfig);
+        final String connectorClass = 
connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        final String connectorVersion = 
connectorProps.get(ConnectorConfig.CONNECTOR_VERSION);
+        if (connectorClass == null || (connectorConverter == null && 
workerConverter == null)) {
+            return new ConverterDefaults();
+        }
+
+        ConverterDefaults defaults = new ConverterDefaults();
+        // update the default of connector converter based on if the worker 
converter is provided
+        defaults.type = workerConverter;
+
+
+        String version = null;
+        if (connectorConverter != null) {
+            version = fetchPluginVersion(plugins, connectorClass, 
connectorVersion, connectorConverter, converterType);
+        } else {
+            version = workerProps.get(workerConverterVersionConfig);
+            if (version == null) {
+                version = plugins.latestVersion(workerConverter);
+            }
+        }
+        defaults.version = version;
+        return defaults;
+    }
+
+    private static void updateKeyDefault(ConfigDef configDef, String 
versionConfigKey, String versionDefault) {
+        ConfigDef.ConfigKey key = configDef.configKeys().get(versionConfigKey);
+        if (key == null) {
+            return;
+        }
+        configDef.configKeys().put(versionConfigKey, new ConfigDef.ConfigKey(
+                versionConfigKey, key.type, versionDefault, key.validator, 
key.importance, key.documentation, key.group, key.orderInGroup, key.width, 
key.displayName, key.dependents, key.recommender, false
+        ));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> String fetchPluginVersion(Plugins plugins, String 
connectorClass, String connectorVersion, String pluginName, Class<T> 
pluginClass) {
+        if (pluginName == null) {
+            return null;
+        }
+        try {
+            VersionRange range = 
PluginUtils.connectorVersionRequirement(connectorVersion);
+            ClassLoader connectorLoader = plugins.pluginLoader(connectorClass, 
range);
+            try (LoaderSwap loaderSwap = 
plugins.withClassLoader(connectorLoader)) {
+                T plugin = (T) plugins.newPlugin(pluginName, pluginClass, 
null);
+                if (plugin instanceof Versioned) {
+                    return ((Versioned) plugin).version();
+                }
+            }

Review Comment:
   This can be a first-class operation in Plugins (and/or 
DelegatingClassLoader) because it can be computed from the information 
collected during scanning, without re-instantiating the plugin.
   
   Like `Plugins#latestVersion(String, VersionRange)`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -647,25 +688,36 @@ ConfigInfos validateConnectorConfig(
             }
         }
         String connType = 
connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        VersionRange connVersion = null;
+        Connector connector;
+        ClassLoader connectorLoader;
+        try {
+            connVersion = 
PluginUtils.connectorVersionRequirement(connectorProps.get(CONNECTOR_VERSION));
+            connector = cachedConnectors.getConnector(connType, connVersion);

Review Comment:
   nit: move this after the connType null check, otherwise this (might?) change 
the null error message?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -98,6 +118,12 @@ public class ConnectorConfig extends AbstractConfig {
             new InstantiableClassValidator()

Review Comment:
   The KIP includes this detail:
   
   > * If the .version property is left empty, or if there are no installed 
plugins with a matching name, errors will only be attributed to the plugin 
class property, not to the .version  property.
   > * If a plugin class name is valid (at least one version is installed) but 
the non-empty .version property doesn't include any of the installed versions, 
then the error will be attributed to both the .version  property and the plugin 
class property.
   
   I don't think the validators for the class or version configs are doing this 
right now. Is that part of this PR or a later PR?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##########
@@ -221,7 +221,7 @@ protected static String 
reflectiveErrorDescription(Throwable t) {
         }
     }
 
-    protected LoaderSwap withClassLoader(ClassLoader loader) {
+    protected static LoaderSwap withClassLoader(ClassLoader loader) {

Review Comment:
   nit: revert



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##########
@@ -141,6 +141,7 @@ public class PluginUtils {
     // regular expression pattern
     private static final Pattern INCLUDE = 
Pattern.compile("^org\\.apache\\.kafka\\.(?:connect\\.(?:"
             + "transforms\\.(?!Transformation|predicates\\.Predicate$).*"
+            + "|testing\\..*"

Review Comment:
   nit: leftover?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -357,6 +357,10 @@ private <T> Set<PluginDesc<T>> pluginsOfClass(String 
classNameOrAlias, Set<Plugi
         return plugins;
     }
 
+    public PluginsRecommenders recommender() {

Review Comment:
   I don't see the ROI here for this method, why can't ConnectorConfig call 
`new`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java:
##########
@@ -244,4 +244,5 @@ public static Map<String, String> patchConfig(
         });
         return result;
     }
+

Review Comment:
   nit: unrelated change



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -187,7 +219,11 @@ public class ConnectorConfig extends AbstractConfig {
     public static final String CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX = 
"admin.override.";
     public static final String PREDICATES_PREFIX = "predicates.";
 
-    private final EnrichedConnectorConfig enrichedConfig;
+    private static final PluginsRecommenders EMPTY_RECOMMENDER = new 
PluginsRecommenders();
+    private static final ConverterDefaults CONVERTER_DEFAULTS = new 
ConverterDefaults();

Review Comment:
   Since you're going to use ConverterDefaults in a static field like this, you 
should make it immutable, and maybe even take advantage of the new Java Record 
type.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java:
##########
@@ -154,9 +158,18 @@ public List<PluginInfo> listConnectorPlugins(
     @GET
     @Path("/{pluginName}/config")
     @Operation(summary = "Get the configuration definition for the specified 
pluginName")
-    public List<ConfigKeyInfo> getConnectorConfigDef(final 
@PathParam("pluginName") String pluginName) {
+    public List<ConfigKeyInfo> getConnectorConfigDef(final 
@PathParam("pluginName") String pluginName,
+                                                     final 
@QueryParam("version") @DefaultValue("latest") String version) {
+
+        VersionRange range = null;
+        try {
+            range = PluginUtils.connectorVersionRequirement(version);
+        } catch (InvalidVersionSpecificationException e) {
+            throw new BadRequestException("Invalid version specification: " + 
version);

Review Comment:
   nit: There is a constructor of BadRequestException which takes a Throwable, 
but i don't think it surfaces on the API. Maybe it would be helpful if someone 
turns on request logging?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -98,6 +118,12 @@ public class ConnectorConfig extends AbstractConfig {
             new InstantiableClassValidator()
     );
 
+    public static final String VALUE_CONVERTER_VERSION_CONFIG = 
WorkerConfig.VALUE_CONVERTER_VERSION;
+    private static final String VALUE_CONVERTER_VERSION_DEFAULT = null;

Review Comment:
   nit: these version defaults are unused, they all seem to be included in the 
single CONVERTER_DEFAULTS.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -405,13 +471,102 @@ protected ConfigDef config(Predicate<?> predicate) {
         return newDef;
     }
 
+    private static <T> ConverterDefaults converterDefaults(
+            Plugins plugins,
+            String connectorConverterConfig,
+            String workerConverterConfig,
+            String workerConverterVersionConfig,
+            Map<String, String> connectorProps,
+            Map<String, String> workerProps,
+            Class<T> converterType
+    ) {
+        /*
+        if a converter is specified in the connector config it overrides the 
worker config for the corresponding converter
+        otherwise the worker config is used, hence if the converter is not 
provided in the connector config, the default
+        is the one provided in the worker config
+
+        for converters which version is used depends on a several factors with 
multi-versioning support
+        A. If the converter class is provided as part of the connector 
properties
+            1. if the version is not provided,
+                - if the converter is packaged with the connector then, the 
packaged version is used
+                - if the converter is not packaged with the connector, the 
latest version is used
+            2. if the version is provided, the provided version is used
+        B. If the converter class is not provided as part of the connector 
properties, but provided as part of the worker properties
+            1. if the version is not provided, the latest version is used
+            2. if the version is provided, the provided version is used
+        C. If the converter class is not provided as part of the connector 
properties and not provided as part of the worker properties,
+        the converter to use is unknown hence no default version can be 
determined (null)

Review Comment:
   The HeaderConverter has a default in WorkerConfig. The key and value 
converters do not have defaults.
   
   ...I wonder where the error appears when the WorkerConfig doesn't specify 
key/value converter...



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -137,12 +169,12 @@ public class ConnectorConfig extends AbstractConfig {
     public static final String CONFIG_RELOAD_ACTION_CONFIG = 
"config.action.reload";
     private static final String CONFIG_RELOAD_ACTION_DOC =
             "The action that Connect should take on the connector when changes 
in external " +
-            "configuration providers result in a change in the connector's 
configuration properties. " +

Review Comment:
   nit: There's a bunch of unrelated changes to indenting and such in 
ConnectorConfig; Are these intentional?
   
   They have the potential to cause merge conflicts with later PRs, or maybe 
they leaked in?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java:
##########
@@ -61,10 +61,11 @@ public String toString() {
                 '}';
     }
     @JsonIgnore
-    DefaultArtifactVersion encodedVersion() {
+    public DefaultArtifactVersion encodedVersion() {
         return encodedVersion;
     }
 
+

Review Comment:
   nit: newline line before method, no double newline after



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to