This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1dd19c6823 [Feature][Connectors-v2] Optimize the size of CDC JAR Files
(#9546)
1dd19c6823 is described below
commit 1dd19c6823ac86b04df8771f570f88a2e8ec5510
Author: zhangdonghao <[email protected]>
AuthorDate: Mon Jul 14 13:18:52 2025 +0800
[Feature][Connectors-v2] Optimize the size of CDC JAR Files (#9546)
---
.../connector-cdc/connector-cdc-base/pom.xml | 11 +--
.../connector-cdc/connector-cdc-mongodb/pom.xml | 12 ++-
.../connector-cdc/connector-cdc-mysql/pom.xml | 11 +++
.../connector-cdc/connector-cdc-opengauss/pom.xml | 13 +++
.../connector-cdc/connector-cdc-oracle/pom.xml | 11 +++
.../connector-cdc/connector-cdc-postgres/pom.xml | 9 ++
.../connector-cdc/connector-cdc-sqlserver/pom.xml | 9 ++
.../connector-cdc/connector-cdc-tidb/pom.xml | 2 +
seatunnel-connectors-v2/connector-jdbc/pom.xml | 1 +
.../FlinkAbstractPluginExecuteProcessor.java | 30 ++++--
.../starter/flink/execution/FlinkExecution.java | 6 +-
seatunnel-dist/pom.xml | 13 +++
.../src/main/assembly/assembly-bin-ci.xml | 3 +-
seatunnel-dist/src/main/assembly/assembly-bin.xml | 1 +
.../connector-cdc-mongodb-e2e/pom.xml | 6 ++
.../connector-cdc-mysql-e2e/pom.xml | 7 ++
.../connector-cdc-opengauss-e2e/pom.xml | 6 ++
.../connector-cdc-oracle-e2e/pom.xml | 6 ++
.../connector-cdc-postgres-e2e/pom.xml | 6 ++
.../connector-cdc-sqlserver-e2e/pom.xml | 6 ++
.../connector-cdc-tidb-e2e/pom.xml | 6 ++
.../seatunnel/e2e/common/util/ContainerUtil.java | 60 ++++++++++--
.../plugin/discovery/AbstractPluginDiscovery.java | 101 ++++++++++++---------
.../seatunnel/SeaTunnelFactoryDiscovery.java | 3 +-
.../seatunnel/SeaTunnelSinkPluginDiscovery.java | 2 +-
.../seatunnel/SeaTunnelSourcePluginDiscovery.java | 2 +-
.../discovery/AbstractPluginDiscoveryTest.java | 24 ++++-
.../SeaTunnelSourcePluginDiscoveryTest.java | 43 +++++++--
28 files changed, 325 insertions(+), 85 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
index 61aaa48211..48c5f4b8fe 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
@@ -123,14 +123,7 @@
</dependencies>
<build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- </plugins>
+ <finalName>${project.artifactId}-${project.version}</finalName>
</build>
+
</project>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/pom.xml
index 7a8b651cf9..7734432799 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/pom.xml
@@ -39,13 +39,23 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-base</artifactId>
<version>${project.version}</version>
- <scope>compile</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>${debezium.version}</version>
<scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.mongodb.kafka</groupId>
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
index b029984621..701d90f53c 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
@@ -49,6 +49,16 @@
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
<scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- test dependencies on TestContainers -->
@@ -66,6 +76,7 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-base</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/pom.xml
index 098c60370d..78b1462937 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-opengauss/pom.xml
@@ -34,6 +34,19 @@
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-jdbc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.opengauss</groupId>
<artifactId>opengauss-jdbc</artifactId>
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/pom.xml
index 84ee6f500f..e3a286325d 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/pom.xml
@@ -74,11 +74,22 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-base</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/pom.xml
index 02de74a6fe..61c52fc5bb 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/pom.xml
@@ -59,6 +59,7 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-base</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -69,6 +70,14 @@
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-api</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/pom.xml
index 298e8ea4fc..9865b8c07d 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/pom.xml
@@ -59,6 +59,7 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-base</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -69,6 +70,14 @@
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-api</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml
index 1f779cd1a7..9557c8570e 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/pom.xml
@@ -53,6 +53,7 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-base</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -64,6 +65,7 @@
<dependency>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
+ <scope>provided</scope>
</dependency>
</dependencies>
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 87705fe01a..e8aa6b3a46 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -212,6 +212,7 @@
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
<version>${tikv.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.opengauss</groupId>
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index cd0df11871..739179cfa5 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
+import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
@@ -36,17 +37,34 @@ import static
org.apache.seatunnel.api.options.ConnectorCommonOptions.PLUGIN_INP
public abstract class FlinkAbstractPluginExecuteProcessor<T>
implements PluginExecuteProcessor<DataStreamTableInfo,
FlinkRuntimeEnvironment> {
- protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER
=
- (classLoader, url) -> {
+ protected static final BiConsumer<ClassLoader, List<URL>>
ADD_URL_TO_CLASSLOADER =
+ (classLoader, urls) -> {
if
(classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
URLClassLoader c =
(URLClassLoader)
ReflectionUtils.getField(classLoader, "inner").get();
- ReflectionUtils.invoke(c, "addURL", url);
+ urls.forEach(url -> ReflectionUtils.invoke(c, "addURL",
url));
} else if (classLoader instanceof URLClassLoader) {
- ReflectionUtils.invoke(classLoader, "addURL", url);
+ urls.forEach(url -> ReflectionUtils.invoke(classLoader,
"addURL", url));
} else {
- throw new RuntimeException(
- "Unsupported classloader: " +
classLoader.getClass().getName());
+ try {
+ // In Java 8, AppClassLoader is a subclass of
URLClassLoader, so classLoader
+ // instanceof URLClassLoader will return true.
However, in Java 11, due to
+ // the introduction of the modular system,
AppClassLoader is no longer a
+ // subclass of URLClassLoader, and this check will
return false. To be
+ // compatible with both Java 8 and Java 11, we can use
reflection to
+ // dynamically call the addURL method of
URLClassLoader.
+ Optional<Method> method =
+ ReflectionUtils.getDeclaredMethod(
+ URLClassLoader.class, "addURL",
URL.class);
+ if (method.isPresent()) {
+ for (URL url : urls) {
+ method.get().invoke(classLoader, url);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unsupported classloader: " +
classLoader.getClass().getName(), e);
+ }
}
};
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 6f456ac69b..0d74c59c79 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -168,10 +168,8 @@ public class FlinkExecution implements TaskExecution {
}
})
.collect(Collectors.toList());
- jarDependencies.forEach(
- url ->
-
FlinkAbstractPluginExecuteProcessor.ADD_URL_TO_CLASSLOADER.accept(
-
Thread.currentThread().getContextClassLoader(), url));
+ FlinkAbstractPluginExecuteProcessor.ADD_URL_TO_CLASSLOADER.accept(
+ Thread.currentThread().getContextClassLoader(),
jarDependencies);
jarPaths.addAll(jarDependencies);
}
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index edc1d262b2..4940ee2786 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -496,6 +496,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-mysql</artifactId>
@@ -975,6 +981,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-sls</artifactId>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 10e1f6ba85..a7c01c59d2 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -154,7 +154,6 @@
<!-- Don't exclude connector-http-base, because it contains
SPI files -->
<exclude>org.apache.seatunnel:connector-file-base</exclude>
<exclude>org.apache.seatunnel:connector-file-base-hadoop</exclude>
- <exclude>org.apache.seatunnel:connector-cdc-base</exclude>
</excludes>
<outputDirectory>/connectors</outputDirectory>
<scope>provided</scope>
@@ -180,6 +179,8 @@
<include>com.amazon.redshift:redshift-jdbc42:jar</include>
<include>net.snowflake.snowflake-jdbc:jar</include>
<include>com.xugudb:xugu-jdbc:jar</include>
+ <include>org.tikv:tikv-client-java:jar</include>
+ <include>org.opengauss:opengauss-jdbc:jar</include>
<include>com.amazonaws:aws-java-sdk-bundle:jar</include>
<include>org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional</include>
<include>org.apache.seatunnel:seatunnel-hadoop-aws:jar:*:optional</include>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml
b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index 74e38503e0..dc558677ab 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -209,6 +209,7 @@
<includes>
<include>org.apache.seatunnel:connector-fake:jar</include>
<include>org.apache.seatunnel:connector-console:jar</include>
+ <include>org.apache.seatunnel:connector-cdc-base:jar</include>
</includes>
<outputDirectory>/connectors</outputDirectory>
<scope>provided</scope>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
index 0c61a1692d..11d00ea4c6 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml
@@ -31,6 +31,12 @@
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-mongodb</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
index 8cb38aaffa..e13217cf4a 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
@@ -39,6 +39,13 @@
<dependencies>
<!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-mysql</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
index b855c0d6d5..7fc698c336 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml
@@ -43,6 +43,12 @@
<dependencies>
<!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-opengauss</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml
index 0757d1128f..4630e3182e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml
@@ -44,6 +44,12 @@
<dependencies>
<!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-oracle</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
index b64cb088d3..43beee56e3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
@@ -39,6 +39,12 @@
<dependencies>
<!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-postgres</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml
index 59a673fe86..a546865337 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml
@@ -39,6 +39,12 @@
<dependencies>
<!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-sqlserver</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/pom.xml
index 6936b36ee7..c4a77b58fb 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/pom.xml
@@ -31,6 +31,12 @@
</properties>
<dependencies>
<!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-base</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-console</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
index cc126c7a55..73446af675 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java
@@ -33,10 +33,16 @@ import org.junit.jupiter.api.Assertions;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.MountableFile;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
import groovy.lang.Tuple2;
import lombok.extern.slf4j.Slf4j;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
@@ -203,9 +209,11 @@ public final class ContainerUtil {
String transformJar = "seatunnel-transforms-v2.jar";
Path transformJarPath =
Paths.get(PROJECT_ROOT_PATH, "seatunnel-transforms-v2",
"target", transformJar);
- container.withCopyFileToContainer(
- MountableFile.forHostPath(transformJarPath),
- Paths.get(seatunnelHomeInContainer, "lib",
transformJar).toString());
+ if (transformJarPath.toFile().exists()) {
+ container.withCopyFileToContainer(
+ MountableFile.forHostPath(transformJarPath),
+ Paths.get(seatunnelHomeInContainer, "lib",
transformJar).toString());
+ }
// copy transform-udf
String transformUdfJar = "seatunnel-transforms-v2-udf.jar";
@@ -217,9 +225,11 @@ public final class ContainerUtil {
"seatunnel-transforms-v2-udf",
"target",
transformUdfJar);
- container.withCopyFileToContainer(
- MountableFile.forHostPath(transformUdfJarPath),
- Paths.get(seatunnelHomeInContainer, "lib",
transformUdfJar).toString());
+ if (transformUdfJarPath.toFile().exists()) {
+ container.withCopyFileToContainer(
+ MountableFile.forHostPath(transformUdfJarPath),
+ Paths.get(seatunnelHomeInContainer, "lib",
transformUdfJar).toString());
+ }
// copy bin
final String startBinPath = startModulePath + File.separator +
"src/main/bin/";
@@ -234,6 +244,30 @@ public final class ContainerUtil {
Paths.get(seatunnelHomeInContainer, "connectors",
PLUGIN_MAPPING_FILE).toString());
}
+ private static String getProjectVersion() {
+ try {
+ DocumentBuilderFactory factory =
DocumentBuilderFactory.newInstance();
+ DocumentBuilder builder = factory.newDocumentBuilder();
+ Document doc = builder.parse(getProjectRootPath() + "/pom.xml");
+ doc.getDocumentElement().normalize();
+ NodeList propertiesList = doc.getElementsByTagName("properties");
+ for (int i = 0; i < propertiesList.getLength(); i++) {
+ Node propertiesNode = propertiesList.item(i);
+ NodeList childNodes = propertiesNode.getChildNodes();
+ for (int j = 0; j < childNodes.getLength(); j++) {
+ Node node = childNodes.item(j);
+ if (node.getNodeType() == Node.ELEMENT_NODE
+ && "revision".equals(node.getNodeName())) {
+ return node.getTextContent();
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return "";
+ }
+
public static String adaptPathForWin(String path) {
// Running IT use cases under Windows requires replacing \ with /
return path == null ? "" : path.replaceAll("\\\\", "/");
@@ -245,6 +279,20 @@ public final class ContainerUtil {
for (File file : Objects.requireNonNull(currentModule.listFiles())) {
getConnectorFiles(file, connectorNames, connectorPrefix,
connectorFiles);
}
+ if (connectorNames.stream().anyMatch(connectorName ->
connectorName.contains("cdc"))) {
+ // copy connector-cdc-base
+ String cdcBaseJar =
+ String.format("%s-%s.jar", "connector-cdc-base",
getProjectVersion());
+ Path cdcBaseJarPath =
+ Paths.get(
+ PROJECT_ROOT_PATH,
+ "seatunnel-connectors-v2",
+ "connector-cdc",
+ "connector-cdc-base",
+ "target",
+ cdcBaseJar);
+ connectorFiles.add(new
File(cdcBaseJarPath.toFile().getAbsolutePath()));
+ }
return connectorFiles;
}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 0bc777be0e..0f8ed3755a 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -63,6 +63,7 @@ import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
@Slf4j
@SuppressWarnings("unchecked")
@@ -74,10 +75,10 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
* Add jar url to classloader. The different engine should have different
logic to add url into
* their own classloader
*/
- private static final BiConsumer<ClassLoader, URL>
DEFAULT_URL_TO_CLASSLOADER =
- (classLoader, url) -> {
+ private static final BiConsumer<ClassLoader, List<URL>>
DEFAULT_URL_TO_CLASSLOADER =
+ (classLoader, urls) -> {
if (classLoader instanceof URLClassLoader) {
- ReflectionUtils.invoke(classLoader, "addURL", url);
+ urls.forEach(url -> ReflectionUtils.invoke(classLoader,
"addURL", url));
} else {
throw new UnsupportedOperationException("can't support
custom load jar");
}
@@ -85,14 +86,14 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
private final Path pluginDir;
private final Config pluginMappingConfig;
- private final BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer;
- protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>>
pluginJarPath =
+ private final BiConsumer<ClassLoader, List<URL>>
addURLToClassLoaderConsumer;
+ protected final ConcurrentHashMap<PluginIdentifier, Optional<List<URL>>>
pluginJarPath =
new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
protected final Map<PluginIdentifier, String> sourcePluginInstance;
protected final Map<PluginIdentifier, String> sinkPluginInstance;
protected final Map<PluginIdentifier, String> transformPluginInstance;
- public AbstractPluginDiscovery(BiConsumer<ClassLoader, URL>
addURLToClassloader) {
+ public AbstractPluginDiscovery(BiConsumer<ClassLoader, List<URL>>
addURLToClassloader) {
this(Common.connectorDir(), loadConnectorPluginConfig(),
addURLToClassloader);
}
@@ -111,7 +112,7 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
public AbstractPluginDiscovery(
Path pluginDir,
Config pluginMappingConfig,
- BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer) {
+ BiConsumer<ClassLoader, List<URL>> addURLToClassLoaderConsumer) {
this.pluginDir = pluginDir;
this.pluginMappingConfig = pluginMappingConfig;
this.addURLToClassLoaderConsumer = addURLToClassLoaderConsumer;
@@ -135,6 +136,7 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
.map(this::getPluginJarPath)
.filter(Optional::isPresent)
.map(Optional::get)
+ .flatMap(Collection::stream)
.distinct()
.collect(Collectors.toList());
}
@@ -196,16 +198,14 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
log.info("Load plugin: {} from classpath", pluginIdentifier);
return Optional.of(pluginInstance);
}
- Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
+ Optional<List<URL>> pluginJarPaths =
getPluginJarPath(pluginIdentifier);
// if the plugin jar not exist in classpath, will load from plugin dir.
- if (pluginJarPath.isPresent()) {
+ if (pluginJarPaths.isPresent()) {
try {
// use current thread classloader to avoid different
classloader load same class
// error.
- this.addURLToClassLoaderConsumer.accept(classLoader,
pluginJarPath.get());
- for (URL jar : pluginJars) {
- addURLToClassLoaderConsumer.accept(classLoader, jar);
- }
+ addURLToClassLoaderConsumer.accept(classLoader,
pluginJarPaths.get());
+ addURLToClassLoaderConsumer.accept(classLoader, (List<URL>)
pluginJars);
} catch (Exception e) {
log.warn(
"can't load jar use current thread classloader, use
URLClassLoader instead now."
@@ -216,7 +216,10 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
for (URL pluginJar : pluginJars) {
urls[i++] = pluginJar;
}
- urls[i] = pluginJarPath.get();
+ urls =
+ Stream.concat(Arrays.stream(urls),
pluginJarPaths.get().stream())
+ .distinct()
+ .toArray(URL[]::new);
classLoader =
new URLClassLoader(urls,
Thread.currentThread().getContextClassLoader());
}
@@ -225,7 +228,7 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
log.info(
"Load plugin: {} from path: {} use classloader: {}",
pluginIdentifier,
- pluginJarPath.get(),
+ pluginJarPaths.get(),
classLoader.getClass().getName());
return Optional.of(pluginInstance);
}
@@ -386,7 +389,7 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
* @param pluginIdentifier plugin identifier.
* @return plugin instance.
*/
- protected Optional<URL> getPluginJarPath(PluginIdentifier
pluginIdentifier) {
+ protected Optional<List<URL>> getPluginJarPath(PluginIdentifier
pluginIdentifier) {
return pluginJarPath.computeIfAbsent(pluginIdentifier,
this::findPluginJarPath);
}
@@ -403,7 +406,7 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
* @param pluginIdentifier plugin identifier.
* @return plugin jar path.
*/
- private Optional<URL> findPluginJarPath(PluginIdentifier pluginIdentifier)
{
+ private Optional<List<URL>> findPluginJarPath(PluginIdentifier
pluginIdentifier) {
final String engineType =
pluginIdentifier.getEngineType().toLowerCase();
final String pluginType =
pluginIdentifier.getPluginType().toLowerCase();
final String pluginName =
pluginIdentifier.getPluginName().toLowerCase();
@@ -427,51 +430,61 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
pluginDir
.toFile()
.listFiles(
- pathname ->
- pathname.getName().endsWith(".jar")
- &&
StringUtils.startsWithIgnoreCase(
- pathname.getName(),
pluginJarPrefix));
+ pathname -> filterPluginJar(pathname,
pluginJarPrefix, pluginName));
if (ArrayUtils.isEmpty(targetPluginFiles)) {
return Optional.empty();
}
+ PluginType type = PluginType.valueOf(pluginType.toUpperCase());
+ List<URL> pluginJarPaths;
try {
- URL pluginJarPath;
if (targetPluginFiles.length == 1) {
- pluginJarPath = targetPluginFiles[0].toURI().toURL();
+ pluginJarPaths =
Collections.singletonList(targetPluginFiles[0].toURI().toURL());
} else {
- PluginType type = PluginType.valueOf(pluginType.toUpperCase());
- pluginJarPath =
+ pluginJarPaths =
selectPluginJar(targetPluginFiles, pluginJarPrefix,
pluginName, type).get();
}
- log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier,
pluginJarPath);
- return Optional.of(pluginJarPath);
} catch (MalformedURLException e) {
- log.warn(
- "Cannot get plugin URL: {} for pluginIdentifier: {}" +
targetPluginFiles[0],
- pluginIdentifier,
- e);
- return Optional.empty();
+ throw new RuntimeException(e);
}
+ log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier,
pluginJarPaths);
+ return Optional.of(pluginJarPaths);
}
- private Optional<URL> selectPluginJar(
+ private boolean filterPluginJar(File pathname, String pluginJarPrefix,
String pluginName) {
+ if (pluginName.contains("cdc")) {
+ return pathname.getName().endsWith(".jar")
+ && (StringUtils.startsWithIgnoreCase(pathname.getName(),
pluginJarPrefix)
+ || StringUtils.startsWithIgnoreCase(
+ pathname.getName(), "connector-cdc-base"));
+ }
+ return pathname.getName().endsWith(".jar")
+ && StringUtils.startsWithIgnoreCase(pathname.getName(),
pluginJarPrefix);
+ }
+
+ private Optional<List<URL>> selectPluginJar(
File[] targetPluginFiles, String pluginJarPrefix, String
pluginName, PluginType type) {
List<URL> resMatchedUrls = new ArrayList<>();
for (File file : targetPluginFiles) {
- Optional<URL> matchedUrl = findMatchingUrl(file, type);
+ Optional<URL> matchedUrl = findMatchingUrl(file, type, pluginName);
matchedUrl.ifPresent(resMatchedUrls::add);
}
- if (resMatchedUrls.size() != 1) {
+ if (pluginName.contains("cdc")) {
+ if (resMatchedUrls.size() != 2) {
+ throw new SeaTunnelException(
+ String.format(
+ "Cannot find plugin jar for pluginIdentifier:
%s -> %s. Possible impact jar: %s",
+ pluginName, pluginJarPrefix,
Arrays.asList(targetPluginFiles)));
+ }
+ } else if (resMatchedUrls.size() != 1) {
throw new SeaTunnelException(
String.format(
"Cannot find unique plugin jar for
pluginIdentifier: %s -> %s. Possible impact jar: %s",
pluginName, pluginJarPrefix,
Arrays.asList(targetPluginFiles)));
- } else {
- return Optional.of(resMatchedUrls.get(0));
}
+ return Optional.of(resMatchedUrls);
}
- private Optional<URL> findMatchingUrl(File file, PluginType type) {
+ private Optional<URL> findMatchingUrl(File file, PluginType type, String
pluginName) {
Map<PluginIdentifier, String> pluginInstanceMap = null;
switch (type) {
case SINK:
@@ -494,13 +507,17 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
}
}
- if (matchedIdentifier.size() == 1) {
- try {
+ try {
+ if (matchedIdentifier.size() == 1) {
return Optional.of(file.toURI().toURL());
- } catch (MalformedURLException e) {
- log.warn("Cannot get plugin URL for pluginIdentifier: {}",
file, e);
}
+ if (pluginName.contains("cdc") &&
file.getName().startsWith("connector-cdc-base")) {
+ return Optional.of(file.toURI().toURL());
+ }
+ } catch (MalformedURLException e) {
+ log.warn("Cannot get plugin URL for pluginIdentifier: {}", file,
e);
}
+
if (log.isDebugEnabled()) {
log.debug(
"File found: {}, matches more than one PluginIdentifier:
{}",
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
index a536f3a446..e86587f0d4 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
import org.apache.commons.lang3.StringUtils;
import java.net.URL;
+import java.util.List;
import java.util.ServiceLoader;
import java.util.function.BiConsumer;
@@ -38,7 +39,7 @@ public class SeaTunnelFactoryDiscovery extends
AbstractPluginDiscovery<Factory>
public SeaTunnelFactoryDiscovery(
Class<? extends Factory> factoryClass,
- BiConsumer<ClassLoader, URL> addURLToClassLoader) {
+ BiConsumer<ClassLoader, List<URL>> addURLToClassLoader) {
super(addURLToClassLoader);
this.factoryClass = factoryClass;
}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
index 145b0a04b4..2ae3bd0d89 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java
@@ -70,7 +70,7 @@ public class SeaTunnelSinkPluginDiscovery extends
AbstractPluginDiscovery<SeaTun
return plugins;
}
- public SeaTunnelSinkPluginDiscovery(BiConsumer<ClassLoader, URL>
addURLToClassLoader) {
+ public SeaTunnelSinkPluginDiscovery(BiConsumer<ClassLoader, List<URL>>
addURLToClassLoader) {
super(addURLToClassLoader);
}
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
index a224066f2d..d892ce1604 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java
@@ -63,7 +63,7 @@ public class SeaTunnelSourcePluginDiscovery extends
AbstractPluginDiscovery<SeaT
return plugins;
}
- public SeaTunnelSourcePluginDiscovery(BiConsumer<ClassLoader, URL>
addURLToClassLoader) {
+ public SeaTunnelSourcePluginDiscovery(BiConsumer<ClassLoader, List<URL>>
addURLToClassLoader) {
super(addURLToClassLoader);
}
diff --git
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
index 4f2d7a1642..42bc12c1be 100644
---
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
+++
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java
@@ -26,18 +26,32 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledOnOs;
-import org.junit.jupiter.api.condition.OS;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Map;
-@DisabledOnOs(OS.WINDOWS)
public class AbstractPluginDiscoveryTest {
private String originSeatunnelHome = null;
private DeployMode originMode = null;
- private static final String seatunnelHome =
- AbstractPluginDiscoveryTest.class.getResource("/home").getPath();
+ private static final String seatunnelHome;
+
+ static {
+ String rootModuleDir = "seatunnel-plugin-discovery";
+ Path path = Paths.get(System.getProperty("user.dir"));
+ while (!path.endsWith(Paths.get(rootModuleDir))) {
+ path = path.getParent();
+ }
+ seatunnelHome =
+ Paths.get(
+ path.getParent().toString(),
+ rootModuleDir,
+ "target",
+ "test-classes",
+ "home")
+ .toString();
+ }
@BeforeEach
public void before() {
diff --git
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
index 023a508e02..a2cc1f95d1 100644
---
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
+++
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
@@ -29,11 +29,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledOnOs;
-import org.junit.jupiter.api.condition.OS;
+import java.io.File;
import java.io.IOException;
-import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -41,13 +39,28 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-@DisabledOnOs(OS.WINDOWS)
class SeaTunnelSourcePluginDiscoveryTest {
+ private static final String seatunnelHome;
+
+ static {
+ String rootModuleDir = "seatunnel-plugin-discovery";
+ Path path = Paths.get(System.getProperty("user.dir"));
+ while (!path.endsWith(Paths.get(rootModuleDir))) {
+ path = path.getParent();
+ }
+ seatunnelHome =
+ Paths.get(
+ path.getParent().toString(),
+ rootModuleDir,
+ "target",
+ "test-classes",
+ "duplicate")
+ .toString();
+ }
+
private String originSeatunnelHome = null;
private DeployMode originMode = null;
- private static final String seatunnelHome =
-
SeaTunnelSourcePluginDiscoveryTest.class.getResource("/duplicate").getPath();
private static final List<Path> pluginJars =
Lists.newArrayList(
Paths.get(seatunnelHome, "connectors",
"connector-http-jira.jar"),
@@ -103,7 +116,14 @@ class SeaTunnelSourcePluginDiscoveryTest {
.toString())
.collect(Collectors.toList()),
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers).stream()
- .map(URL::getPath)
+ .map(
+ url -> {
+ try {
+ return new File(url.toURI()).getPath();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
.collect(Collectors.toList()));
}
@@ -149,7 +169,14 @@ class SeaTunnelSourcePluginDiscoveryTest {
.toString())
.collect(Collectors.toList()),
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers).stream()
- .map(URL::getPath)
+ .map(
+ url -> {
+ try {
+ return new File(url.toURI()).getPath();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
.collect(Collectors.toList()));
}