gharris1727 commented on code in PR #16984:
URL: https://github.com/apache/kafka/pull/16984#discussion_r1874057246


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -69,36 +75,117 @@ public DelegatingClassLoader() {
 
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
+     *
      * @param name The fully qualified class name of the plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
     // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
+
         SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
         return pluginLoader instanceof PluginClassLoader
-               ? (PluginClassLoader) pluginLoader
-               : null;
+            ? (PluginClassLoader) pluginLoader
+            : null;
     }
 
-    ClassLoader connectorLoader(String connectorClassOrAlias) {
-        String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
-        ClassLoader classLoader = pluginClassLoader(fullName);
-        if (classLoader == null) classLoader = this;
+    PluginClassLoader pluginClassLoader(String name) {
+        return pluginClassLoader(name, null);
+    }
+
+    ClassLoader loader(String classOrAlias, VersionRange range) throws 
ClassNotFoundException {
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        ClassLoader classLoader = range == null ? pluginClassLoader(fullName) :
+                // load the plugin class when version is provided which will 
validate the correct version is loaded
+                loadVersionedPluginClass(fullName, range, 
false).getClassLoader();

Review Comment:
   > So the reason why I have this check in place is because the classloader 
instantiation for versioned plugins should ideally make sure that the correct 
one is returned
   
   I'm not really sure I understand this justification.
   1. If code is calling `#loader(String, VersionRange)` to get a ClassLoader, 
it's going to be to interact with a plugin with a specific version 
(instantiate, call methods, etc).
   2. In order to get a plugin class or instance of a specific version, the 
caller will need to use `#loadVersionedPluginClass(String, VersionRange, 
boolean)` which will perform the verification.
   3. If the other verification wasn't sufficient, then they must have used a 
different VersionRange.
   
   So i think this exception would only get thrown if the caller made a mistake 
and used two different VersionRange objects (or a null one).
   I do want to make this sort of mistake less likely, but I think throwing an 
exception is not the way to do that.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to