snehashisp commented on code in PR #16984: URL: https://github.com/apache/kafka/pull/16984#discussion_r1870935453
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ########## @@ -69,36 +70,109 @@ public DelegatingClassLoader() { /** * Retrieve the PluginClassLoader associated with a plugin class + * * @param name The fully qualified class name of the plugin * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. */ // VisibleForTesting - PluginClassLoader pluginClassLoader(String name) { + PluginClassLoader pluginClassLoader(String name, VersionRange range) { if (!PluginUtils.shouldLoadInIsolation(name)) { return null; } + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); if (inner == null) { return null; } - ClassLoader pluginLoader = inner.get(inner.lastKey()); + + + ClassLoader pluginLoader = findPluginLoader(inner, name, range); return pluginLoader instanceof PluginClassLoader - ? (PluginClassLoader) pluginLoader - : null; + ? (PluginClassLoader) pluginLoader + : null; + } + + PluginClassLoader pluginClassLoader(String name) { + return pluginClassLoader(name, null); + } + + ClassLoader connectorLoader(String connectorClassOrAlias, VersionRange range) throws ClassNotFoundException { + String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); + // if the plugin is not loaded via the plugin classloader, it might still be available in the parent delegating + // classloader, in order to check if the version satisfies the requirement we need to load the plugin class here + ClassLoader classLoader = loadVersionedPluginClass(fullName, range, false).getClassLoader(); + log.debug( + "Got plugin class loader: '{}' for connector: {}", + classLoader, + connectorClassOrAlias + ); + return classLoader; } ClassLoader connectorLoader(String connectorClassOrAlias) { String fullName = aliases.getOrDefault(connectorClassOrAlias, connectorClassOrAlias); ClassLoader classLoader = pluginClassLoader(fullName); if (classLoader == null) classLoader = this; log.debug( - "Getting plugin class loader: '{}' for connector: {}", - classLoader, - connectorClassOrAlias + "Getting plugin class loader: '{}' for connector: {}", + classLoader, + connectorClassOrAlias ); return classLoader; } + String resolveFullClassName(String classOrAlias) { + return aliases.getOrDefault(classOrAlias, classOrAlias); + } + + String latestVersion(String classOrAlias) { + if (classOrAlias == null) { + return null; + } + String fullName = aliases.getOrDefault(classOrAlias, classOrAlias); + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName); + if (inner == null) { + return null; + } + return inner.lastKey().version(); + } + + private ClassLoader findPluginLoader( + SortedMap<PluginDesc<?>, ClassLoader> loaders, + String pluginName, + VersionRange range + ) { + + if (range != null) { + + ArtifactVersion version = range.getRecommendedVersion(); + + if (range.hasRestrictions()) { + List<ArtifactVersion> versions = loaders.keySet().stream().map(PluginDesc::encodedVersion).collect(Collectors.toList()); + version = range.matchVersion(versions); Review Comment: Thanks, have altered the logic as per your suggestion, this should also address the other [comment](https://github.com/apache/kafka/pull/16984#discussion_r1868472074)on picking the latest plugin. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org