snehashisp commented on code in PR #16984:
URL: https://github.com/apache/kafka/pull/16984#discussion_r1873998808


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -69,36 +75,117 @@ public DelegatingClassLoader() {
 
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
+     *
      * @param name The fully qualified class name of the plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
     // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
+
         SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
         return pluginLoader instanceof PluginClassLoader
-               ? (PluginClassLoader) pluginLoader
-               : null;
+            ? (PluginClassLoader) pluginLoader
+            : null;
     }
 
-    ClassLoader connectorLoader(String connectorClassOrAlias) {
-        String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
-        ClassLoader classLoader = pluginClassLoader(fullName);
-        if (classLoader == null) classLoader = this;
+    PluginClassLoader pluginClassLoader(String name) {
+        return pluginClassLoader(name, null);
+    }
+
+    ClassLoader loader(String classOrAlias, VersionRange range) throws 
ClassNotFoundException {
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        ClassLoader classLoader = range == null ? pluginClassLoader(fullName) :
+                // load the plugin class when version is provided which will 
validate the correct version is loaded
+                loadVersionedPluginClass(fullName, range, 
false).getClassLoader();

Review Comment:
   So the reason why I have this check in place is because the classloader 
instantiation for versioned plugins should ideally make sure that the correct 
one is returned. I think returning delegating loader would be incorrect 
(without the validation) since we are not checking if there is a different 
version of the plugin present in class path (or multiple versions). I think if 
the actual plugin instantiation is done appropriately (as opposed to something 
like a `Utils.newInstance` after swapping to the returned classloader) we 
should be fine.  LMK you thoughts on this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to