snehashisp commented on code in PR #17743: URL: https://github.com/apache/kafka/pull/17743#discussion_r1966255974
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -549,7 +549,7 @@ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPro } private HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) { - if (!config.originals().containsKey(classPropertyName) && classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { + if (config.getClass(classPropertyName) == null && classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { Review Comment: The config for header converter does not have a default defined in [ConnectorConfig](https://github.com/apache/kafka/blob/84caaa6e9da06435411510a81fa321d4f99c351f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L106), its only in WorkerConfig where the SimpleHeaderConverter is set as default. This method is first called with connectorConfig and then with the workerConfig if the connector config does not define the header converter. One thing I have changed is that I am no longer passing the class loader (CURRENT or PLUGINS) from the worker during converter instantiation. If a version is provided we should not use the current class loader as that would always bring in the latest version of the converter packaged with the connector plugin. Checking if a version is provided in the configs and passing the appropriate class loader was getting a bit messy in the worker code and I think its better suited to be abstracted away in plugins (load with CURRENT if no version is provided and with PLUGINS otherwise). Right now, in the worker, while instantiating converters from the worker config we explicitly use PLUGINS loader. With this change, if no version for the converter is provided in the worker config we will use CURRENT classloader. This should not change the behavior as both flows for class loading go via the delegating classloader as it is swapped in prior to the worker config instantiation. Now with this change, the existing if condition with `!config.originals().containsKey(classPropertyName)` will return null even if the worker config has defaults as we are using `config.originals()` which disregards the defaults specified in the config def. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org