snehashisp commented on code in PR #17741: URL: https://github.com/apache/kafka/pull/17741#discussion_r1871510609
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java: ########## @@ -411,13 +449,149 @@ protected ConfigDef config(Predicate<?> predicate) { return newDef; } + public static void updateDefaults(ConfigDef configDef, Plugins plugins, Map<String, String> connProps, Map<String, String> workerProps) { + updateAllConverterDefaults(configDef, plugins, connProps, workerProps); + updateConnectorVersionDefaults(configDef, plugins, connProps.get(CONNECTOR_CLASS_CONFIG)); + } + + public static void updateConnectorVersionDefaults(ConfigDef configDef, Plugins plugins, String connectorClass) { + // if provided connector version is null, the latest version is used + updateKeyDefault(configDef, ConnectorConfig.CONNECTOR_VERSION, plugins.latestVersion(connectorClass)); + } + + public static void updateAllConverterDefaults(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateKeyConverterDefault(configDef, plugins, connProps, workerProps); + updateValueConverterDefault(configDef, plugins, connProps, workerProps); + updateHeaderConverterDefault(configDef, plugins, connProps, workerProps); + } + + public static void updateKeyConverterDefault(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateConverterDefaults( + configDef, plugins, + KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, + KEY_CONVERTER_VERSION_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerProps + ); + } + + public static void updateValueConverterDefault(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateConverterDefaults( + configDef, plugins, + VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, + VALUE_CONVERTER_VERSION_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerProps + ); + } + + public static void updateHeaderConverterDefault(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateConverterDefaults( + configDef, plugins, + HEADER_CONVERTER_CLASS_CONFIG, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, + HEADER_CONVERTER_VERSION_CONFIG, WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerProps + ); + } + + private static void updateConverterDefaults( Review Comment: Yeah, I completely agree that updating defaults in this way is not ideal. However, in practice trying I found that generating dynamic configuration keys is equally cumbersome and makes the code harder to read. The complexity arises due to configuration groups and maintaining correct order between them. Enriched plugins for transformations has each new transformation as part of a separate group, whereas converter configs all come under the same common group. We would need to break up the existing ConfigDef and keep a pointer to the start of the common group from the converter related configs are introduced, and maintain a separate configDef for non-defaults. It's doable but it ended up more complex and error prone than the simple update I do here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org