This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 512d982ee3 Throw IllegalArgumentException when find multiple connector 
jar for one pluginIdentifier (#5551)
512d982ee3 is described below

commit 512d982ee31ad6fa4fa36f8fd1017913f4bacbc2
Author: Wenjun Ruan <wen...@apache.org>
AuthorDate: Thu Sep 28 14:02:45 2023 +0800

    Throw IllegalArgumentException when find multiple connector jar for one 
pluginIdentifier (#5551)
---
 .../org/apache/seatunnel/common/config/Common.java |  9 ++-
 .../plugin/discovery/AbstractPluginDiscovery.java  | 37 ++++++----
 .../discovery/AbstractPluginDiscoveryTest.java     | 27 +++++--
 .../SeaTunnelSourcePluginDiscoveryTest.java        | 86 ++++++++++++++++++++++
 .../duplicate/connectors/plugin-mapping.properties | 21 ++++++
 5 files changed, 158 insertions(+), 22 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index 88a13fe781..e6ca3cbe49 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.common.config;
 
 import org.apache.commons.lang3.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -67,7 +69,7 @@ public class Common {
         return MODE;
     }
 
-    private static String getSeaTunnelHome() {
+    public static String getSeaTunnelHome() {
 
         if (StringUtils.isNotEmpty(SEATUNNEL_HOME)) {
             return SEATUNNEL_HOME;
@@ -83,6 +85,11 @@ public class Common {
         return SEATUNNEL_HOME;
     }
 
+    @VisibleForTesting
+    public static void setSeaTunnelHome(String seatunnelHome) {
+        SEATUNNEL_HOME = seatunnelHome;
+    }
+
     /**
      * Root dir varies between different spark master and deploy mode, it also 
varies between
      * relative and absolute path. When running seatunnel in --master local, 
you can put plugins
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index ee27429fce..a2006f7dc9 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -47,6 +47,7 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -78,7 +79,7 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
             };
 
     private final Path pluginDir;
-    private final Config pluginConfig;
+    private final Config pluginMappingConfig;
     private final BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer;
     protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> 
pluginJarPath =
             new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
@@ -95,16 +96,16 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
         this(pluginDir, loadConnectorPluginConfig());
     }
 
-    public AbstractPluginDiscovery(Path pluginDir, Config pluginConfig) {
-        this(pluginDir, pluginConfig, DEFAULT_URL_TO_CLASSLOADER);
+    public AbstractPluginDiscovery(Path pluginDir, Config pluginMappingConfig) 
{
+        this(pluginDir, pluginMappingConfig, DEFAULT_URL_TO_CLASSLOADER);
     }
 
     public AbstractPluginDiscovery(
             Path pluginDir,
-            Config pluginConfig,
+            Config pluginMappingConfig,
             BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer) {
         this.pluginDir = pluginDir;
-        this.pluginConfig = pluginConfig;
+        this.pluginMappingConfig = pluginMappingConfig;
         this.addURLToClassLoaderConsumer = addURLToClassLoaderConsumer;
         log.info("Load {} Plugin from {}", 
getPluginBaseClass().getSimpleName(), pluginDir);
     }
@@ -328,16 +329,13 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
      * @return plugin jar path.
      */
     private Optional<URL> findPluginJarPath(PluginIdentifier pluginIdentifier) 
{
-        if (pluginConfig.isEmpty()) {
-            return Optional.empty();
-        }
         final String engineType = 
pluginIdentifier.getEngineType().toLowerCase();
         final String pluginType = 
pluginIdentifier.getPluginType().toLowerCase();
         final String pluginName = 
pluginIdentifier.getPluginName().toLowerCase();
-        if (!pluginConfig.hasPath(engineType)) {
+        if (!pluginMappingConfig.hasPath(engineType)) {
             return Optional.empty();
         }
-        Config engineConfig = pluginConfig.getConfig(engineType);
+        Config engineConfig = pluginMappingConfig.getConfig(engineType);
         if (!engineConfig.hasPath(pluginType)) {
             return Optional.empty();
         }
@@ -365,15 +363,24 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
         if (ArrayUtils.isEmpty(targetPluginFiles)) {
             return Optional.empty();
         }
+        if (targetPluginFiles.length > 1) {
+            throw new IllegalArgumentException(
+                    "Found multiple plugin jar: "
+                            + Arrays.stream(targetPluginFiles)
+                                    .map(File::getPath)
+                                    .collect(Collectors.joining(","))
+                            + " for pluginIdentifier: "
+                            + pluginIdentifier);
+        }
         try {
             URL pluginJarPath = targetPluginFiles[0].toURI().toURL();
-            log.info(
-                    "Discovery plugin jar: {} at: {}",
-                    pluginIdentifier.getPluginName(),
-                    pluginJarPath);
+            log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier, 
pluginJarPath);
             return Optional.of(pluginJarPath);
         } catch (MalformedURLException e) {
-            log.warn("Cannot get plugin URL: " + targetPluginFiles[0], e);
+            log.warn(
+                    "Cannot get plugin URL: {} for pluginIdentifier: {}" + 
targetPluginFiles[0],
+                    pluginIdentifier,
+                    e);
             return Optional.empty();
         }
     }
diff --git 
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
index 5141d7b71d..379fcd2ace 100644
--- 
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
+++ 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
@@ -21,24 +21,33 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.common.constants.PluginType;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
 import java.util.Map;
-import java.util.Objects;
 
 @DisabledOnOs(OS.WINDOWS)
 public class AbstractPluginDiscoveryTest {
 
+    private String originSeatunnelHome = null;
+    private DeployMode originMode = null;
+    private static final String seatunnelHome =
+            AbstractPluginDiscoveryTest.class.getResource("/home").getPath();
+
+    @BeforeEach
+    public void before() {
+        originMode = Common.getDeployMode();
+        Common.setDeployMode(DeployMode.CLIENT);
+        originSeatunnelHome = Common.getSeaTunnelHome();
+        Common.setSeaTunnelHome(seatunnelHome);
+    }
+
     @Test
     public void testGetAllPlugins() {
-        Common.setDeployMode(DeployMode.CLIENT);
-        System.setProperty(
-                "SEATUNNEL_HOME",
-                
Objects.requireNonNull(AbstractPluginDiscoveryTest.class.getResource("/home"))
-                        .getPath());
         Map<PluginIdentifier, String> sourcePlugins =
                 
AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE);
         Assertions.assertEquals(27, sourcePlugins.size());
@@ -47,4 +56,10 @@ public class AbstractPluginDiscoveryTest {
                 
AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SINK);
         Assertions.assertEquals(30, sinkPlugins.size());
     }
+
+    @AfterEach
+    public void after() {
+        Common.setSeaTunnelHome(originSeatunnelHome);
+        Common.setDeployMode(originMode);
+    }
 }
diff --git 
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
new file mode 100644
index 0000000000..81333d4b4d
--- /dev/null
+++ 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin.discovery.seatunnel;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+@DisabledOnOs(OS.WINDOWS)
+class SeaTunnelSourcePluginDiscoveryTest {
+
+    private String originSeatunnelHome = null;
+    private DeployMode originMode = null;
+    private static final String seatunnelHome =
+            
SeaTunnelSourcePluginDiscoveryTest.class.getResource("/duplicate").getPath();
+    private static final List<Path> pluginJars =
+            Lists.newArrayList(
+                    Paths.get(seatunnelHome, "connectors", 
"connector-http-jira.jar"),
+                    Paths.get(seatunnelHome, "connectors", 
"connector-http.jar"));
+
+    @BeforeEach
+    public void before() throws IOException {
+        originMode = Common.getDeployMode();
+        Common.setDeployMode(DeployMode.CLIENT);
+        originSeatunnelHome = Common.getSeaTunnelHome();
+        Common.setSeaTunnelHome(seatunnelHome);
+
+        // The file is created under target directory.
+        for (Path pluginJar : pluginJars) {
+            Files.createFile(pluginJar);
+        }
+    }
+
+    @Test
+    void getPluginBaseClass() {
+        List<PluginIdentifier> pluginIdentifiers =
+                Lists.newArrayList(
+                        PluginIdentifier.of("seatunnel", 
PluginType.SOURCE.getType(), "HttpJira"),
+                        PluginIdentifier.of("seatunnel", 
PluginType.SOURCE.getType(), "HttpBase"));
+        SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
+                new SeaTunnelSourcePluginDiscovery();
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> 
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers));
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        for (Path pluginJar : pluginJars) {
+            Files.deleteIfExists(pluginJar);
+        }
+        Common.setSeaTunnelHome(originSeatunnelHome);
+        Common.setDeployMode(originMode);
+    }
+}
diff --git 
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
 
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
new file mode 100644
index 0000000000..be38939a7f
--- /dev/null
+++ 
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+seatunnel.source.HttpBase = connector-http
+seatunnel.sink.HttpBase = connector-http
+seatunnel.source.HttpJira = connector-http-jira
+seatunnel.sink.HttpJira = connector-http-jira
\ No newline at end of file

Reply via email to