[ 
https://issues.apache.org/jira/browse/KAFKA-18119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17902428#comment-17902428
 ] 

Greg Harris commented on KAFKA-18119:
-------------------------------------

[~snehashisp] Thanks for the bug report.

> This call eventually ends up finding the class already loaded in the services 
> loading step 
> [here|https://github.com/apache/kafka/blob/b9a45546a7918799b6fb3c0fe63b56f47d8fcba9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java#L112].
>  In case service loading is not used the *findLoadedClass* does not find the 
> provided class (unless it's also packaged with the plugin) which then 
> delegates to parent ({*}DelagatingClassLoader{*}) and the current plugin 
> loading implementation in connect loads the latest class it found in all the 
> plugins paths.

Interesting. The difference is going to be because the DelegatingClassLoader is 
not fully initialized  when scanning is happening, and so it will only delegate 
upwards to the parent/classpath. After 
DelegatingClassLoader#installDiscoveredPlugins is called and finishes the 
initialization, this is intended to change the behavior of the 
DelegatingClassLoader to start doing downwards-delegation. As you noticed, this 
doesn't have any effect for classes that exist on the classpath, because the 
PluginClassLoaders have already cached the result from the first initialization.

> One possible solution is to not do a parent lookup of resources in 
> *getResources.* I think the plugin class loader should not load the resources 
> from parent but wanted to understand if there is anything I am missing

This was implemented previously, and removed as part of KAFKA-14789. It was a 
brittle solution, and inconsistently applied to the different plugins.

I think the real problem is that the JVM's load-once-cache-forever logic is 
incompatible with the mutable DelegatingClassLoader, and strategies to avoid 
caching the invalid data during initialization are going to be brittle. For 
example consider a FooConnector that depends directly on JsonConverter, but 
doesn't package it, expecting it to be present on the classpath. If you 
discover FooConnector, it will load JsonConverter and still populate the cache 
with the wrong result.

> Service loading loads incorrect plugin version.
> -----------------------------------------------
>
>                 Key: KAFKA-18119
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18119
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>    Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2, 3.8.0, 3.7.1, 3.9.0, 3.8.1
>            Reporter: Snehashis Pal
>            Priority: Major
>
> Kafka connect seems to be loading incorrect version of connect plugins (such 
> as connectors) if run in SERVICE_LOAD mode. If the worker is started with 
> only service loading enabled, it does not seem to be loading the latest 
> version of a plugin available in its plugins path. Rather it always seems to 
> be defaulting to the plugin version provided in the classpath. 
> I observed this when I placed an updated json-converter in the plugins path 
> and a connector instantiated, still defaulted to using the one provided in 
> the connect (Kafka) distribution. It does not happen when the older 
> reflections-based plugin scanner is used. This can be reproduced by following 
> the same process, noted down below
>  * Set {*}plugin.discovery{*}=SERVICE_LOAD
>  * Install a newer version of json-converter 
> ({*}org.apache.kafka.connect.json.JsonConverter{*}) than the one provided in 
> the distribution. Usually, the bundled version is the same as the current 
> Kafka distribution. 
>  * Start a connector with either *key.converter* or *value.converter* set to 
> the json converter. Without this in the config the latest version is loaded.
> It might be hard to judge which version of the converter is loaded. For 
> testing it might be good to build a new json converter with some log lines 
> depicting the version in use. Another way would be run connect in debug mode 
> and add some breakpoints in the startTask method in Worker where the 
> converters are initialised.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to