rhauch commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616119723



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -196,8 +214,10 @@ private static PluginClassLoader newPluginClassLoader(
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved 
outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+                allAddedPlugins.put(pluginClassName, new ArrayList<>());
             }
             inner.put(plugin, loader);
+            allAddedPlugins.get(pluginClassName).add(plugin);

Review comment:
       We can use `computeIfAbsent(...)` to eliminate the prior newly-added 
line:
   ```suggestion
               allAddedPlugins.computeIfAbsent(pluginClassName, n -> new 
ArrayList<>()).add(plugin);
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -153,13 +157,26 @@ public PluginClassLoader pluginClassLoader(String name) {
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+        ClassLoader pluginLoader = inner.get(pluginDescInUse(inner));
         return pluginLoader instanceof PluginClassLoader
                ? (PluginClassLoader) pluginLoader
                : null;
     }
 
-    public ClassLoader connectorLoader(Connector connector) {
+    //visible for testing
+    PluginDesc<?> pluginDescInUse(String name) {

Review comment:
       I realize that `pluginDesc` in this name is just the equivalent of 
`PluginDesc`, but generally we try to avoid abbreviations, which is probably 
more true in this case because the name seems even more mangled:
   ```suggestion
       PluginDesc<?> usedPluginDesc(String name) {
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -153,13 +157,26 @@ public PluginClassLoader pluginClassLoader(String name) {
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+        ClassLoader pluginLoader = inner.get(pluginDescInUse(inner));
         return pluginLoader instanceof PluginClassLoader
                ? (PluginClassLoader) pluginLoader
                : null;
     }
 
-    public ClassLoader connectorLoader(Connector connector) {
+    //visible for testing
+    PluginDesc<?> pluginDescInUse(String name) {
+        SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
+        if (inner == null) {
+            return null;
+        }
+        return pluginDescInUse(inner);
+    }
+
+    private PluginDesc<?> pluginDescInUse(SortedMap<PluginDesc<?>, 
ClassLoader> inner) {
+        return inner.lastKey();
+    }
+
+        public ClassLoader connectorLoader(Connector connector) {

Review comment:
       ```suggestion
       public ClassLoader connectorLoader(Connector connector) {
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -208,6 +228,22 @@ protected void initLoaders() {
         // Finally add parent/system loader.
         initPluginLoader(CLASSPATH_NAME);
         addAllAliases();
+        reportPluginConflicts();
+    }
+
+    //visible for testing
+    Set<String> reportPluginConflicts() {
+        Set<String> conflictPluginClasses = new HashSet<>();
+        for (Map.Entry<String, List<PluginDesc<?>>> entry : 
allAddedPlugins.entrySet()) {
+            String pluginClassName = entry.getKey();
+            List<PluginDesc<?>> pluginDescriptors = entry.getValue();
+            if (pluginDescriptors.size() > 1) {
+                PluginDesc<?> pluginDescInUse = 
pluginDescInUse(pluginClassName);
+                log.error("For plugin '{}', detected multiple copies '{}', 
this copy '{}' will be used.", pluginClassName, pluginDescriptors, 
pluginDescInUse);
+                conflictPluginClasses.add(pluginClassName);
+            }
+        }
+        return conflictPluginClasses;

Review comment:
       This could be rewritten a bit more compactly and a bit more functionally:
   ```suggestion
           return allAddedPlugins.entrySet().stream().filter(e -> 
e.getValue().size() > 1).map(e -> {
               String pluginClassName = e.getKey();
               PluginDesc<?> pluginDescInUse = pluginDescInUse(pluginClassName);
               List<PluginDesc<?>> ignoredPlugins = new 
ArrayList<>(e.getValue());
               ignoredPlugins.remove(pluginDescInUse);
               log.error("Detected multiple plugins contain '{}'; using {} and 
ignoring {}", pluginClassName, pluginDescInUse, ignoredPlugins);
               return pluginClassName;
           }).collect(Collectors.toSet());
   ```
   My suggestion also includes a reworded error message to put the more 
meaningful information near the front, and computes the unused plugins and 
includes them at the end of the message, just in case there are multiple.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -196,8 +214,10 @@ private static PluginClassLoader newPluginClassLoader(
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved 
outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+                allAddedPlugins.put(pluginClassName, new ArrayList<>());

Review comment:
       If we use `computeIfAbsent(...)` below, we don't need this line:
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to