snehashisp commented on code in PR #17743:
URL: https://github.com/apache/kafka/pull/17743#discussion_r1966255974


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -549,7 +549,7 @@ public HeaderConverter newHeaderConverter(AbstractConfig 
config, String classPro
     }
 
     private HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
-        if (!config.originals().containsKey(classPropertyName) && 
classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+        if (config.getClass(classPropertyName) == null && classLoaderUsage == 
ClassLoaderUsage.CURRENT_CLASSLOADER) {

Review Comment:
   The config for header converter does not have a default defined in 
[ConnectorConfig](https://github.com/apache/kafka/blob/84caaa6e9da06435411510a81fa321d4f99c351f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L106),
 its only in WorkerConfig where the SimpleHeaderConverter is set as default. 
This method is first called with connectorConfig and then with the workerConfig 
if the connector config does not define the header converter. 
   
   One thing I have changed is that I am no longer passing the class loader 
(CURRENT or PLUGINS) from the worker during converter instantiation. If a 
version is provided we should not use the current class loader as that would 
always bring in the latest version of the converter packaged with the connector 
plugin. Checking if a version is provided in the configs and passing the 
appropriate class loader was getting a bit messy in the worker code and I think 
its better suited to be abstracted away in plugins (load with CURRENT if no 
version is provided and with PLUGINS otherwise). Right now, in the worker, 
while instantiating converters from the worker config we explicitly use PLUGINS 
loader. With this change, if no version for the converter is provided in the 
worker config we will use CURRENT classloader. This should not change the 
behavior as both flows for class loading go via the delegating classloader as 
it is swapped in prior to the worker config instantiation. 
   
   Now with this change, the existing if condition with 
`!config.originals().containsKey(classPropertyName)` will return null even if 
the worker config has defaults as we are using `config.originals()` which 
disregards the defaults specified in the config def. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to