rhauch commented on a change in pull request #10549: URL: https://github.com/apache/kafka/pull/10549#discussion_r616119723
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ########## @@ -196,8 +214,10 @@ private static PluginClassLoader newPluginClassLoader( pluginLoaders.put(pluginClassName, inner); // TODO: once versioning is enabled this line should be moved outside this if branch log.info("Added plugin '{}'", pluginClassName); + allAddedPlugins.put(pluginClassName, new ArrayList<>()); } inner.put(plugin, loader); + allAddedPlugins.get(pluginClassName).add(plugin); Review comment: We can use `computeIfAbsent(...)` to eliminate the prior newly-added line: ```suggestion allAddedPlugins.computeIfAbsent(pluginClassName, n -> new ArrayList<>()).add(plugin); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ########## @@ -153,13 +157,26 @@ public PluginClassLoader pluginClassLoader(String name) { if (inner == null) { return null; } - ClassLoader pluginLoader = inner.get(inner.lastKey()); + ClassLoader pluginLoader = inner.get(pluginDescInUse(inner)); return pluginLoader instanceof PluginClassLoader ? (PluginClassLoader) pluginLoader : null; } - public ClassLoader connectorLoader(Connector connector) { + //visible for testing + PluginDesc<?> pluginDescInUse(String name) { Review comment: I realize that `pluginDesc` in this name is just the equivalent of `PluginDesc`, but generally we try to avoid abbreviations, which is probably more true in this case because the name seems even more mangled: ```suggestion PluginDesc<?> usedPluginDesc(String name) { ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ########## @@ -153,13 +157,26 @@ public PluginClassLoader pluginClassLoader(String name) { if (inner == null) { return null; } - ClassLoader pluginLoader = inner.get(inner.lastKey()); + ClassLoader pluginLoader = inner.get(pluginDescInUse(inner)); return pluginLoader instanceof PluginClassLoader ? (PluginClassLoader) pluginLoader : null; } - public ClassLoader connectorLoader(Connector connector) { + //visible for testing + PluginDesc<?> pluginDescInUse(String name) { + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); + if (inner == null) { + return null; + } + return pluginDescInUse(inner); + } + + private PluginDesc<?> pluginDescInUse(SortedMap<PluginDesc<?>, ClassLoader> inner) { + return inner.lastKey(); + } + + public ClassLoader connectorLoader(Connector connector) { Review comment: ```suggestion public ClassLoader connectorLoader(Connector connector) { ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ########## @@ -208,6 +228,22 @@ protected void initLoaders() { // Finally add parent/system loader. initPluginLoader(CLASSPATH_NAME); addAllAliases(); + reportPluginConflicts(); + } + + //visible for testing + Set<String> reportPluginConflicts() { + Set<String> conflictPluginClasses = new HashSet<>(); + for (Map.Entry<String, List<PluginDesc<?>>> entry : allAddedPlugins.entrySet()) { + String pluginClassName = entry.getKey(); + List<PluginDesc<?>> pluginDescriptors = entry.getValue(); + if (pluginDescriptors.size() > 1) { + PluginDesc<?> pluginDescInUse = pluginDescInUse(pluginClassName); + log.error("For plugin '{}', detected multiple copies '{}', this copy '{}' will be used.", pluginClassName, pluginDescriptors, pluginDescInUse); + conflictPluginClasses.add(pluginClassName); + } + } + return conflictPluginClasses; Review comment: This could be rewritten a bit more compactly and a bit more functionally: ```suggestion return allAddedPlugins.entrySet().stream().filter(e -> e.getValue().size() > 1).map(e -> { String pluginClassName = e.getKey(); PluginDesc<?> pluginDescInUse = pluginDescInUse(pluginClassName); List<PluginDesc<?>> ignoredPlugins = new ArrayList<>(e.getValue()); ignoredPlugins.remove(pluginDescInUse); log.error("Detected multiple plugins contain '{}'; using {} and ignoring {}", pluginClassName, pluginDescInUse, ignoredPlugins); return pluginClassName; }).collect(Collectors.toSet()); ``` My suggestion also includes a reworded error message to put the more meaningful information near the front, and computes the unused plugins and includes them at the end of the message, just in case there are multiple. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ########## @@ -196,8 +214,10 @@ private static PluginClassLoader newPluginClassLoader( pluginLoaders.put(pluginClassName, inner); // TODO: once versioning is enabled this line should be moved outside this if branch log.info("Added plugin '{}'", pluginClassName); + allAddedPlugins.put(pluginClassName, new ArrayList<>()); Review comment: If we use `computeIfAbsent(...)` below, we don't need this line: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org