snehashisp commented on code in PR #17741:
URL: https://github.com/apache/kafka/pull/17741#discussion_r1871510609


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -411,13 +449,149 @@ protected ConfigDef config(Predicate<?> predicate) {
         return newDef;
     }
 
+    public static void updateDefaults(ConfigDef configDef, Plugins plugins, 
Map<String, String> connProps, Map<String, String> workerProps) {
+        updateAllConverterDefaults(configDef, plugins, connProps, workerProps);
+        updateConnectorVersionDefaults(configDef, plugins, 
connProps.get(CONNECTOR_CLASS_CONFIG));
+    }
+
+    public static void updateConnectorVersionDefaults(ConfigDef configDef, 
Plugins plugins, String connectorClass) {
+        // if provided connector version is null, the latest version is used
+        updateKeyDefault(configDef, ConnectorConfig.CONNECTOR_VERSION, 
plugins.latestVersion(connectorClass));
+    }
+
+    public static void updateAllConverterDefaults(ConfigDef configDef, Plugins 
plugins,
+                                                        Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateKeyConverterDefault(configDef, plugins, connProps, workerProps);
+        updateValueConverterDefault(configDef, plugins, connProps, 
workerProps);
+        updateHeaderConverterDefault(configDef, plugins, connProps, 
workerProps);
+    }
+
+    public static void updateKeyConverterDefault(ConfigDef configDef, Plugins 
plugins,
+                                                        Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                KEY_CONVERTER_CLASS_CONFIG, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+                KEY_CONVERTER_VERSION_CONFIG, 
WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    public static void updateValueConverterDefault(ConfigDef configDef, 
Plugins plugins,
+                                                          Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                VALUE_CONVERTER_CLASS_CONFIG, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
+                VALUE_CONVERTER_VERSION_CONFIG, 
WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    public static void updateHeaderConverterDefault(ConfigDef configDef, 
Plugins plugins,
+                                                           Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                HEADER_CONVERTER_CLASS_CONFIG, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                HEADER_CONVERTER_VERSION_CONFIG, 
WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    private static void updateConverterDefaults(

Review Comment:
   Yeah, I completely agree that updating defaults in this way is not ideal. 
However, in practice I found that generating dynamic configuration keys is 
equally cumbersome and makes the code harder to read. The complexity arises due 
to configuration groups and maintaining correct order between them. Enriched 
plugins for transformations has each new transformation as part of a separate 
group, whereas converter configs all come under the same common group. We would 
need to break up the existing ConfigDef and keep a pointer to the start of the 
common group from the converter related configs are introduced, and maintain a 
separate configDef for non-defaults. It's doable but it ended up more complex 
and error prone than the simple update I do here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to