snehashisp commented on code in PR #17741:
URL: https://github.com/apache/kafka/pull/17741#discussion_r1871510609


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -411,13 +449,149 @@ protected ConfigDef config(Predicate<?> predicate) {
         return newDef;
     }
 
+    public static void updateDefaults(ConfigDef configDef, Plugins plugins, 
Map<String, String> connProps, Map<String, String> workerProps) {
+        updateAllConverterDefaults(configDef, plugins, connProps, workerProps);
+        updateConnectorVersionDefaults(configDef, plugins, 
connProps.get(CONNECTOR_CLASS_CONFIG));
+    }
+
+    public static void updateConnectorVersionDefaults(ConfigDef configDef, 
Plugins plugins, String connectorClass) {
+        // if provided connector version is null, the latest version is used
+        updateKeyDefault(configDef, ConnectorConfig.CONNECTOR_VERSION, 
plugins.latestVersion(connectorClass));
+    }
+
+    public static void updateAllConverterDefaults(ConfigDef configDef, Plugins 
plugins,
+                                                        Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateKeyConverterDefault(configDef, plugins, connProps, workerProps);
+        updateValueConverterDefault(configDef, plugins, connProps, 
workerProps);
+        updateHeaderConverterDefault(configDef, plugins, connProps, 
workerProps);
+    }
+
+    public static void updateKeyConverterDefault(ConfigDef configDef, Plugins 
plugins,
+                                                        Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                KEY_CONVERTER_CLASS_CONFIG, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+                KEY_CONVERTER_VERSION_CONFIG, 
WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    public static void updateValueConverterDefault(ConfigDef configDef, 
Plugins plugins,
+                                                          Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                VALUE_CONVERTER_CLASS_CONFIG, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
+                VALUE_CONVERTER_VERSION_CONFIG, 
WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    public static void updateHeaderConverterDefault(ConfigDef configDef, 
Plugins plugins,
+                                                           Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                HEADER_CONVERTER_CLASS_CONFIG, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                HEADER_CONVERTER_VERSION_CONFIG, 
WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    private static void updateConverterDefaults(

Review Comment:
   Yeah, I completely agree that updating defaults in this way is not ideal. 
However, in practice trying I found that generating dynamic configuration keys 
is equally cumbersome and makes the code harder to read. The complexity arises 
due to configuration groups and maintaining correct order between them. 
Enriched plugins for transformations has each new transformation as part of a 
separate group, whereas converter configs all come under the same common group. 
We would need to break up the existing ConfigDef and keep a pointer to the 
start of the common group from the converter related configs are introduced, 
and maintain a separate configDef for non-defaults. It's doable but it ended up 
more complex and error prone than the simple update I do here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to