C0urante commented on code in PR #14089:
URL: https://github.com/apache/kafka/pull/14089#discussion_r1273737370


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java:
##########
@@ -115,6 +115,10 @@ public int compareTo(PluginDesc<T> other) {
         int versionComp = encodedVersion.compareTo(other.encodedVersion);
         // isolated plugins appear after classpath plugins when they have 
identical versions.
         int isolatedComp = Boolean.compare(other.loader instanceof 
PluginClassLoader, loader instanceof PluginClassLoader);
-        return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : 
isolatedComp);
+        // choose an arbitrary order between different classloaders and types
+        int loaderComp = loader.hashCode() - other.loader.hashCode();
+        int typeComp = type.compareTo(other.type);
+        return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp :
+                (isolatedComp != 0 ? isolatedComp : (loaderComp != 0 ? 
loaderComp : typeComp)));

Review Comment:
   Nit: parentheses are unnecessary, and this is a little bit more readable 
when broken across newlines
   ```suggestion
           return nameComp != 0 ? nameComp :
                   versionComp != 0 ? versionComp :
                   isolatedComp != 0 ? isolatedComp :
                   loaderComp != 0 ? loaderComp :
                   typeComp;
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java:
##########
@@ -115,6 +115,10 @@ public int compareTo(PluginDesc<T> other) {
         int versionComp = encodedVersion.compareTo(other.encodedVersion);
         // isolated plugins appear after classpath plugins when they have 
identical versions.
         int isolatedComp = Boolean.compare(other.loader instanceof 
PluginClassLoader, loader instanceof PluginClassLoader);
-        return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : 
isolatedComp);
+        // choose an arbitrary order between different classloaders and types
+        int loaderComp = loader.hashCode() - other.loader.hashCode();

Review Comment:
   Perhaps if we want to reduce the likelihood of collisions, we can use 
`System::identityHashCode`?
   ```suggestion
           int loaderComp = System.identityHashCode(loader) - 
System.identityHashCode(other.loader)
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java:
##########
@@ -228,29 +255,66 @@ public void testPluginDescComparison() {
         PluginDesc<Predicate> predicateDescPluginPath = new PluginDesc<>(
                 Predicate.class,
                 regularVersion,
+                PluginType.PREDICATE,
                 pluginLoader
         );
 
         PluginDesc<Predicate> predicateDescClasspath = new PluginDesc<>(
                 Predicate.class,
                 regularVersion,
+                PluginType.PREDICATE,
                 systemLoader
         );
 
         assertNewer(predicateDescPluginPath, predicateDescClasspath);
+
+        PluginDesc<ConfigProvider> configProviderDescPluginPath = new 
PluginDesc<>(
+                FileConfigProvider.class,
+                regularVersion,
+                PluginType.CONFIGPROVIDER,
+                pluginLoader
+        );
+
+        PluginDesc<ConfigProvider> configProviderDescOtherPluginLoader = new 
PluginDesc<>(
+                FileConfigProvider.class,
+                regularVersion,
+                PluginType.CONFIGPROVIDER,
+                otherPluginLoader
+        );
+
+        assertTrue("Different plugin loaders should have an ordering",
+                
configProviderDescPluginPath.compareTo(configProviderDescOtherPluginLoader) != 
0);
+
+
+        PluginDesc<Converter> jsonConverterPlugin = new PluginDesc<>(
+                JsonConverter.class,
+                regularVersion,
+                PluginType.CONVERTER,
+                systemLoader
+        );
+
+        PluginDesc<HeaderConverter> jsonHeaderConverterPlugin = new 
PluginDesc<>(
+                JsonConverter.class,
+                regularVersion,
+                PluginType.HEADER_CONVERTER,
+                systemLoader
+        );
+
+        assertNewer(jsonConverterPlugin, (PluginDesc<Converter>) 
(PluginDesc<?>) jsonHeaderConverterPlugin);

Review Comment:
   With the suggestion in the high-level review comment, we can remove the 
casting here and change `assertNewer` to take in two instances of 
`PluginDesc<?>` as its parameters.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##########
@@ -120,32 +120,32 @@ private void loadJdbcDrivers(final ClassLoader loader) {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, PluginSource source) {
-        return new PluginDesc(plugin, version, source.loader());
+    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, PluginType type, PluginSource source) {
+        return new PluginDesc(plugin, version, type, source.loader());
     }
 
     @SuppressWarnings("unchecked")
-    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, PluginSource source) {
+    protected <T> SortedSet<PluginDesc<T>> 
getServiceLoaderPluginDesc(PluginType type, PluginSource source) {
         SortedSet<PluginDesc<T>> result = new TreeSet<>();
-        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, 
source.loader());
+        ServiceLoader<T> serviceLoader = ServiceLoader.load((Class<T>) 
type.superClass(), source.loader());
         Iterator<T> iterator = serviceLoader.iterator();
-        while (handleLinkageError(klass, source, iterator::hasNext)) {
+        while (handleLinkageError(type, source, iterator::hasNext)) {
             try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
                 T pluginImpl;
                 try {
-                    pluginImpl = handleLinkageError(klass, source, 
iterator::next);
+                    pluginImpl = handleLinkageError(type, source, 
iterator::next);
                 } catch (ServiceConfigurationError t) {
                     log.error("Failed to discover {} in {}{}",
-                            klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
+                            type, source.location(), 
reflectiveErrorDescription(t.getCause()), t);

Review Comment:
   Logging the `PluginType` directly is going to be slightly less readable 
("source" instead of "SourceConnector", "configprovider" instead of 
"ConfigProvider", etc.). Do you think we can log `type.simpleName()` in places 
like this?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java:
##########
@@ -168,30 +187,34 @@ public void testPluginDescEquality() {
         PluginDesc<Transformation> transformDescPluginPath = new PluginDesc<>(
                 Transformation.class,
                 null,
+                PluginType.TRANSFORMATION,
                 pluginLoader
         );
 
         PluginDesc<Transformation> transformDescClasspath = new PluginDesc<>(
                 Transformation.class,
                 noVersion,
+                PluginType.TRANSFORMATION,
                 pluginLoader
         );
 
         assertNotEquals(transformDescPluginPath, transformDescClasspath);
     }
 
-    @SuppressWarnings("rawtypes")
+    @SuppressWarnings({"rawtypes", "unchecked"})
     @Test
-    public void testPluginDescComparison() {
-        PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
-                Connector.class,
+    public void testPluginDescComparison() throws MalformedURLException {

Review Comment:
   Is this `throws` clause still necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to