C0urante commented on code in PR #14089: URL: https://github.com/apache/kafka/pull/14089#discussion_r1273737370
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java: ########## @@ -115,6 +115,10 @@ public int compareTo(PluginDesc<T> other) { int versionComp = encodedVersion.compareTo(other.encodedVersion); // isolated plugins appear after classpath plugins when they have identical versions. int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader); - return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : isolatedComp); + // choose an arbitrary order between different classloaders and types + int loaderComp = loader.hashCode() - other.loader.hashCode(); + int typeComp = type.compareTo(other.type); + return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : + (isolatedComp != 0 ? isolatedComp : (loaderComp != 0 ? loaderComp : typeComp))); Review Comment: Nit: parentheses are unnecessary, and this is a little bit more readable when broken across newlines ```suggestion return nameComp != 0 ? nameComp : versionComp != 0 ? versionComp : isolatedComp != 0 ? isolatedComp : loaderComp != 0 ? loaderComp : typeComp; ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java: ########## @@ -115,6 +115,10 @@ public int compareTo(PluginDesc<T> other) { int versionComp = encodedVersion.compareTo(other.encodedVersion); // isolated plugins appear after classpath plugins when they have identical versions. int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader); - return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : isolatedComp); + // choose an arbitrary order between different classloaders and types + int loaderComp = loader.hashCode() - other.loader.hashCode(); Review Comment: Perhaps if we want to reduce the likelihood of collisions, we can use `System::identityHashCode`? ```suggestion int loaderComp = System.identityHashCode(loader) - System.identityHashCode(other.loader) ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java: ########## @@ -228,29 +255,66 @@ public void testPluginDescComparison() { PluginDesc<Predicate> predicateDescPluginPath = new PluginDesc<>( Predicate.class, regularVersion, + PluginType.PREDICATE, pluginLoader ); PluginDesc<Predicate> predicateDescClasspath = new PluginDesc<>( Predicate.class, regularVersion, + PluginType.PREDICATE, systemLoader ); assertNewer(predicateDescPluginPath, predicateDescClasspath); + + PluginDesc<ConfigProvider> configProviderDescPluginPath = new PluginDesc<>( + FileConfigProvider.class, + regularVersion, + PluginType.CONFIGPROVIDER, + pluginLoader + ); + + PluginDesc<ConfigProvider> configProviderDescOtherPluginLoader = new PluginDesc<>( + FileConfigProvider.class, + regularVersion, + PluginType.CONFIGPROVIDER, + otherPluginLoader + ); + + assertTrue("Different plugin loaders should have an ordering", + configProviderDescPluginPath.compareTo(configProviderDescOtherPluginLoader) != 0); + + + PluginDesc<Converter> jsonConverterPlugin = new PluginDesc<>( + JsonConverter.class, + regularVersion, + PluginType.CONVERTER, + systemLoader + ); + + PluginDesc<HeaderConverter> jsonHeaderConverterPlugin = new PluginDesc<>( + JsonConverter.class, + regularVersion, + PluginType.HEADER_CONVERTER, + systemLoader + ); + + assertNewer(jsonConverterPlugin, (PluginDesc<Converter>) (PluginDesc<?>) jsonHeaderConverterPlugin); Review Comment: With the suggestion in the high-level review comment, we can remove the casting here and change `assertNewer` to take in two instances of `PluginDesc<?>` as its parameters. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ########## @@ -120,32 +120,32 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) - protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, PluginSource source) { - return new PluginDesc(plugin, version, source.loader()); + protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, PluginType type, PluginSource source) { + return new PluginDesc(plugin, version, type, source.loader()); } @SuppressWarnings("unchecked") - protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, PluginSource source) { + protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(PluginType type, PluginSource source) { SortedSet<PluginDesc<T>> result = new TreeSet<>(); - ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, source.loader()); + ServiceLoader<T> serviceLoader = ServiceLoader.load((Class<T>) type.superClass(), source.loader()); Iterator<T> iterator = serviceLoader.iterator(); - while (handleLinkageError(klass, source, iterator::hasNext)) { + while (handleLinkageError(type, source, iterator::hasNext)) { try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { T pluginImpl; try { - pluginImpl = handleLinkageError(klass, source, iterator::next); + pluginImpl = handleLinkageError(type, source, iterator::next); } catch (ServiceConfigurationError t) { log.error("Failed to discover {} in {}{}", - klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); + type, source.location(), reflectiveErrorDescription(t.getCause()), t); Review Comment: Logging the `PluginType` directly is going to be slightly less readable ("source" instead of "SourceConnector", "configprovider" instead of "ConfigProvider", etc.). Do you think we can log `type.simpleName()` in places like this? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java: ########## @@ -168,30 +187,34 @@ public void testPluginDescEquality() { PluginDesc<Transformation> transformDescPluginPath = new PluginDesc<>( Transformation.class, null, + PluginType.TRANSFORMATION, pluginLoader ); PluginDesc<Transformation> transformDescClasspath = new PluginDesc<>( Transformation.class, noVersion, + PluginType.TRANSFORMATION, pluginLoader ); assertNotEquals(transformDescPluginPath, transformDescClasspath); } - @SuppressWarnings("rawtypes") + @SuppressWarnings({"rawtypes", "unchecked"}) @Test - public void testPluginDescComparison() { - PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>( - Connector.class, + public void testPluginDescComparison() throws MalformedURLException { Review Comment: Is this `throws` clause still necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org