snehashisp commented on code in PR #17741:
URL: https://github.com/apache/kafka/pull/17741#discussion_r1873340474


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -411,13 +449,149 @@ protected ConfigDef config(Predicate<?> predicate) {
         return newDef;
     }
 
+    public static void updateDefaults(ConfigDef configDef, Plugins plugins, 
Map<String, String> connProps, Map<String, String> workerProps) {
+        updateAllConverterDefaults(configDef, plugins, connProps, workerProps);
+        updateConnectorVersionDefaults(configDef, plugins, 
connProps.get(CONNECTOR_CLASS_CONFIG));
+    }
+
+    public static void updateConnectorVersionDefaults(ConfigDef configDef, 
Plugins plugins, String connectorClass) {
+        // if provided connector version is null, the latest version is used
+        updateKeyDefault(configDef, ConnectorConfig.CONNECTOR_VERSION, 
plugins.latestVersion(connectorClass));
+    }
+
+    public static void updateAllConverterDefaults(ConfigDef configDef, Plugins 
plugins,
+                                                        Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateKeyConverterDefault(configDef, plugins, connProps, workerProps);
+        updateValueConverterDefault(configDef, plugins, connProps, 
workerProps);
+        updateHeaderConverterDefault(configDef, plugins, connProps, 
workerProps);
+    }
+
+    public static void updateKeyConverterDefault(ConfigDef configDef, Plugins 
plugins,
+                                                        Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                KEY_CONVERTER_CLASS_CONFIG, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+                KEY_CONVERTER_VERSION_CONFIG, 
WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    public static void updateValueConverterDefault(ConfigDef configDef, 
Plugins plugins,
+                                                          Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                VALUE_CONVERTER_CLASS_CONFIG, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
+                VALUE_CONVERTER_VERSION_CONFIG, 
WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    public static void updateHeaderConverterDefault(ConfigDef configDef, 
Plugins plugins,
+                                                           Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                HEADER_CONVERTER_CLASS_CONFIG, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                HEADER_CONVERTER_VERSION_CONFIG, 
WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    private static void updateConverterDefaults(

Review Comment:
   I changed this as per your suggestion. The changes needed to make the 
plugins instance in `PluginVersionUtils` non static made it so that even the 
recommenders needed dynamic updates which was making things even more 
complicated. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to