snehashisp commented on code in PR #17741: URL: https://github.com/apache/kafka/pull/17741#discussion_r1873340474
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java: ########## @@ -411,13 +449,149 @@ protected ConfigDef config(Predicate<?> predicate) { return newDef; } + public static void updateDefaults(ConfigDef configDef, Plugins plugins, Map<String, String> connProps, Map<String, String> workerProps) { + updateAllConverterDefaults(configDef, plugins, connProps, workerProps); + updateConnectorVersionDefaults(configDef, plugins, connProps.get(CONNECTOR_CLASS_CONFIG)); + } + + public static void updateConnectorVersionDefaults(ConfigDef configDef, Plugins plugins, String connectorClass) { + // if provided connector version is null, the latest version is used + updateKeyDefault(configDef, ConnectorConfig.CONNECTOR_VERSION, plugins.latestVersion(connectorClass)); + } + + public static void updateAllConverterDefaults(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateKeyConverterDefault(configDef, plugins, connProps, workerProps); + updateValueConverterDefault(configDef, plugins, connProps, workerProps); + updateHeaderConverterDefault(configDef, plugins, connProps, workerProps); + } + + public static void updateKeyConverterDefault(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateConverterDefaults( + configDef, plugins, + KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, + KEY_CONVERTER_VERSION_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerProps + ); + } + + public static void updateValueConverterDefault(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateConverterDefaults( + configDef, plugins, + VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, + VALUE_CONVERTER_VERSION_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerProps + ); + } + + public static void updateHeaderConverterDefault(ConfigDef configDef, Plugins plugins, + Map<String, String> connProps, Map<String, String> workerProps) { + updateConverterDefaults( + configDef, plugins, + HEADER_CONVERTER_CLASS_CONFIG, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, + HEADER_CONVERTER_VERSION_CONFIG, WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerProps + ); + } + + private static void updateConverterDefaults( Review Comment: I changed this as per your suggestion. The changes needed to make the plugins instance in `PluginVersionUtils` non static made it so that even the recommenders needed dynamic updates which was making things even more complicated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org