This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e2a4772933 [Feature][Connector-v2] Support S3 filesystem of paimon 
connector (#8036)
e2a4772933 is described below

commit e2a477293303161e064d6197fe82b77de375f5ef
Author: dailai <837833...@qq.com>
AuthorDate: Thu Nov 14 20:27:43 2024 +0800

    [Feature][Connector-v2] Support S3 filesystem of paimon connector (#8036)
---
 docs/en/connector-v2/sink/Paimon.md                |  54 +++++++++
 docs/en/connector-v2/source/Paimon.md              |  32 +++++
 docs/zh/connector-v2/sink/Paimon.md                |  54 ++++++++-
 seatunnel-connectors-v2/connector-paimon/pom.xml   |  34 ++++++
 .../paimon/catalog/PaimonCatalogLoader.java        |   9 +-
 .../seatunnel/paimon/filesystem/S3Loader.java      |  47 ++++++++
 .../services/org.apache.paimon.fs.FileIOLoader     |  16 +++
 .../connector-paimon-e2e/pom.xml                   |  35 +++++-
 .../e2e/connector/paimon/PaimonWithS3IT.java       | 130 +++++++++++++++++++++
 .../src/test/resources/fake_to_paimon_with_s3.conf |  95 +++++++++++++++
 .../test/resources/paimon_with_s3_to_assert.conf   |  98 ++++++++++++++++
 .../container/seatunnel/SeaTunnelContainer.java    |   2 +-
 12 files changed, 596 insertions(+), 10 deletions(-)

diff --git a/docs/en/connector-v2/sink/Paimon.md 
b/docs/en/connector-v2/sink/Paimon.md
index c9e4b3a9b6..68c0755cfd 100644
--- a/docs/en/connector-v2/sink/Paimon.md
+++ b/docs/en/connector-v2/sink/Paimon.md
@@ -61,6 +61,13 @@ All `changelog-producer` modes are currently supported. The 
default is `none`.
 > note: 
 > When you use a streaming mode to read paimon table,different mode will 
 > produce [different 
 > results](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
 
+## Filesystems
+The Paimon connector supports writing data to multiple file systems. 
Currently, the supported file systems are hdfs and s3.
+If you use the s3 filesystem. You can configure the 
`fs.s3a.access-key`、`fs.s3a.secret-key`、`fs.s3a.endpoint`、`fs.s3a.path.style.access`、`fs.s3a.aws.credentials.provider`
 properties in the `paimon.hadoop.conf` option.
+Besides, the warehouse should start with `s3a://`.
+
+
+
 ## Examples
 
 ### Single table
@@ -94,6 +101,53 @@ sink {
 }
 ```
 
+### Single table with s3 filesystem
+
+```hocon
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "s3a://test/"
+    database = "seatunnel_namespace11"
+    table = "st_test"
+    paimon.hadoop.conf = {
+        fs.s3a.access-key=G52pnxg67819khOZ9ezX
+        fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
+        fs.s3a.endpoint="http://minio4:9000";
+        fs.s3a.path.style.access=true
+        
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+    }
+  }
+}
+```
+
 ### Single table(Specify hadoop HA config and kerberos config)
 
 ```hocon
diff --git a/docs/en/connector-v2/source/Paimon.md 
b/docs/en/connector-v2/source/Paimon.md
index e586a4fd9d..cbe3b592f8 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -82,6 +82,11 @@ Properties in hadoop conf
 
 The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files
 
+## Filesystems
+The Paimon connector supports writing data to multiple file systems. 
Currently, the supported file systems are hdfs and s3.
+If you use the s3 filesystem. You can configure the 
`fs.s3a.access-key`、`fs.s3a.secret-key`、`fs.s3a.endpoint`、`fs.s3a.path.style.access`、`fs.s3a.aws.credentials.provider`
 properties in the `paimon.hadoop.conf` option.
+Besides, the warehouse should start with `s3a://`.
+
 ## Examples
 
 ### Simple example
@@ -109,6 +114,33 @@ source {
 }
 ```
 
+###  S3 example
+```hocon
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "s3a://test/"
+    database = "seatunnel_namespace11"
+    table = "st_test"
+    paimon.hadoop.conf = {
+        fs.s3a.access-key=G52pnxg67819khOZ9ezX
+        fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
+        fs.s3a.endpoint="http://minio4:9000";
+        fs.s3a.path.style.access=true
+        
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+    }
+  }
+}
+
+sink {
+  Console{}
+}
+```
+
 ### Hadoop conf example
 
 ```hocon
diff --git a/docs/zh/connector-v2/sink/Paimon.md 
b/docs/zh/connector-v2/sink/Paimon.md
index 375c8c90ca..09f4e63fbf 100644
--- a/docs/zh/connector-v2/sink/Paimon.md
+++ b/docs/zh/connector-v2/sink/Paimon.md
@@ -58,7 +58,12 @@ 
Paimon表的changelog产生模式有[四种](https://paimon.apache.org/docs/mast
 * 
[`lookup`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#lookup)
 * 
[`full-compaction`](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/#full-compaction)
 > 注意:
- > 
当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
+> 
当你使用流模式去读paimon表的数据时,不同模式将会产生[不同的结果](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Paimon.md#changelog)。
+
+## 文件系统
+Paimon连接器支持向多文件系统写入数据。目前支持的文件系统有hdfs和s3。
+如果您使用s3文件系统。您可以配置`fs.s3a.access-key `, `fs.s3a.secret-key`, `fs.s3a.endpoint`, 
`fs.s3a.path.style.access`, 
`fs.s3a.aws.credentials`。在`paimon.hadoop.conf`选项中设置提供程序的属性。
+除此之外,warehouse应该以`s3a://`开头。
 
 ## 示例
 
@@ -93,6 +98,53 @@ sink {
 }
 ```
 
+### 单表(基于S3文件系统)
+
+```hocon
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "s3a://test/"
+    database = "seatunnel_namespace11"
+    table = "st_test"
+    paimon.hadoop.conf = {
+        fs.s3a.access-key=G52pnxg67819khOZ9ezX
+        fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
+        fs.s3a.endpoint="http://minio4:9000";
+        fs.s3a.path.style.access=true
+        
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+    }
+  }
+}
+```
+
 ### 单表(指定hadoop HA配置和kerberos配置)
 
 ```hocon
diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml 
b/seatunnel-connectors-v2/connector-paimon/pom.xml
index 80934e68a2..0cd3f535d0 100644
--- a/seatunnel-connectors-v2/connector-paimon/pom.xml
+++ b/seatunnel-connectors-v2/connector-paimon/pom.xml
@@ -32,6 +32,7 @@
     <properties>
         <paimon.version>0.7.0-incubating</paimon.version>
         <hive.version>2.3.9</hive.version>
+        <connector.name>connector.paimon</connector.name>
     </properties>
 
     <dependencies>
@@ -47,6 +48,12 @@
             <version>${paimon.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-s3-impl</artifactId>
+            <version>${paimon.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-guava</artifactId>
@@ -98,4 +105,31 @@
 
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <filters>
+                                <filter>
+                                    
<artifact>org.apache.paimon:paimon-s3-impl</artifact>
+                                    <excludes>
+                                        <exclude>org/apache/hadoop/**</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
index 774576c408..ae1f6d675a 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java
@@ -44,6 +44,7 @@ public class PaimonCatalogLoader implements Serializable {
     private static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
 
     private static final String HDFS_PREFIX = "hdfs://";
+    private static final String S3A_PREFIX = "s3a://";
     /** ********* Hdfs constants ************* */
     private static final String HDFS_IMPL = 
"org.apache.hadoop.hdfs.DistributedFileSystem";
 
@@ -63,7 +64,7 @@ public class PaimonCatalogLoader implements Serializable {
     }
 
     public Catalog loadCatalog() {
-        // When using the seatunel engine, set the current class loader to 
prevent loading failures
+        // When using the seatunnel engine, set the current class loader to 
prevent loading failures
         
Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader());
         final Map<String, String> optionsMap = new HashMap<>(1);
         optionsMap.put(CatalogOptions.WAREHOUSE.key(), warehouse);
@@ -71,12 +72,12 @@ public class PaimonCatalogLoader implements Serializable {
         if (warehouse.startsWith(HDFS_PREFIX)) {
             checkConfiguration(paimonHadoopConfiguration, HDFS_DEF_FS_NAME);
             paimonHadoopConfiguration.set(HDFS_IMPL_KEY, HDFS_IMPL);
+        } else if (warehouse.startsWith(S3A_PREFIX)) {
+            
optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY));
         }
         if (PaimonCatalogEnum.HIVE.getType().equals(catalogType.getType())) {
             optionsMap.put(CatalogOptions.URI.key(), catalogUri);
-            paimonHadoopConfiguration
-                    .getPropsWithPrefix(StringUtils.EMPTY)
-                    .forEach((k, v) -> optionsMap.put(k, v));
+            
optionsMap.putAll(paimonHadoopConfiguration.getPropsWithPrefix(StringUtils.EMPTY));
         }
         final Options options = Options.fromMap(optionsMap);
         PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/filesystem/S3Loader.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/filesystem/S3Loader.java
new file mode 100644
index 0000000000..915070c8ea
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/filesystem/S3Loader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.filesystem;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOLoader;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.s3.S3FileIO;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class S3Loader implements FileIOLoader {
+    @Override
+    public String getScheme() {
+        return "s3a";
+    }
+
+    @Override
+    public List<String[]> requiredOptions() {
+        List<String[]> options = new ArrayList<>();
+        options.add(new String[] {"fs.s3a.access-key", "fs.s3a.access.key"});
+        options.add(new String[] {"fs.s3a.secret-key", "fs.s3a.secret.key"});
+        options.add(new String[] {"fs.s3a.endpoint", "fs.s3a.endpoint"});
+        return options;
+    }
+
+    @Override
+    public FileIO load(Path path) {
+        return new S3FileIO();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
 
b/seatunnel-connectors-v2/connector-paimon/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
new file mode 100644
index 0000000000..0057f40425
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.seatunnel.connectors.seatunnel.paimon.filesystem.S3Loader
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
index 69ea9a9f74..71784966f8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/pom.xml
@@ -25,17 +25,32 @@
     <artifactId>connector-paimon-e2e</artifactId>
     <name>SeaTunnel : E2E : Connector V2 : Paimon</name>
 
+    <properties>
+        <testcontainer.version>1.19.1</testcontainer.version>
+        <minio.version>8.5.6</minio.version>
+    </properties>
+
     <dependencies>
+        <!-- minio containers -->
         <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-fake</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>minio</artifactId>
+            <version>${testcontainer.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>io.minio</groupId>
+            <artifactId>minio</artifactId>
+            <version>${minio.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-paimon</artifactId>
+            <artifactId>connector-seatunnel-e2e-base</artifactId>
             <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -44,6 +59,18 @@
             <classifier>optional</classifier>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-paimon</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-assert</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
new file mode 100644
index 0000000000..2df1a5e49b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.paimon;
+
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MinIOContainer;
+
+import io.minio.BucketExistsArgs;
+import io.minio.MakeBucketArgs;
+import io.minio.MinioClient;
+
+import java.nio.file.Paths;
+import java.util.Map;
+
+public class PaimonWithS3IT extends SeaTunnelContainer {
+
+    private static final String MINIO_DOCKER_IMAGE = 
"minio/minio:RELEASE.2024-06-13T22-53-53Z";
+    private static final String HOST = "minio";
+    private static final int MINIO_PORT = 9000;
+    private static final String MINIO_USER_NAME = "minio";
+    private static final String MINIO_USER_PASSWORD = "miniominio";
+
+    private static final String BUCKET = "test";
+
+    private MinIOContainer container;
+    private MinioClient minioClient;
+
+    private Map<String, Object> PAIMON_SINK_PROPERTIES;
+
+    protected static final String AWS_SDK_DOWNLOAD =
+            
"https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar";;
+    protected static final String HADOOP_AWS_DOWNLOAD =
+            
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.4/hadoop-aws-3.1.4.jar";;
+
+    @Override
+    @BeforeAll
+    public void startUp() throws Exception {
+        container =
+                new MinIOContainer(MINIO_DOCKER_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(HOST)
+                        .withUserName(MINIO_USER_NAME)
+                        .withPassword(MINIO_USER_PASSWORD)
+                        .withExposedPorts(MINIO_PORT);
+        container.start();
+
+        String s3URL = container.getS3URL();
+
+        // configuringClient
+        minioClient =
+                MinioClient.builder()
+                        .endpoint(s3URL)
+                        .credentials(container.getUserName(), 
container.getPassword())
+                        .build();
+
+        // create bucket
+        
minioClient.makeBucket(MakeBucketArgs.builder().bucket(BUCKET).build());
+
+        BucketExistsArgs existsArgs = 
BucketExistsArgs.builder().bucket(BUCKET).build();
+        Assertions.assertTrue(minioClient.bucketExists(existsArgs));
+        super.startUp();
+    }
+
+    @Override
+    @AfterAll
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (container != null) {
+            container.close();
+        }
+    }
+
+    @Override
+    protected String[] buildStartCommand() {
+        return new String[] {
+            "bash",
+            "-c",
+            "wget -P "
+                    + SEATUNNEL_HOME
+                    + "lib "
+                    + AWS_SDK_DOWNLOAD
+                    + " &&"
+                    + "wget -P "
+                    + SEATUNNEL_HOME
+                    + "lib "
+                    + HADOOP_AWS_DOWNLOAD
+                    + " &&"
+                    + ContainerUtil.adaptPathForWin(
+                            Paths.get(SEATUNNEL_HOME, "bin", 
SERVER_SHELL).toString())
+        };
+    }
+
+    @Override
+    protected boolean isIssueWeAlreadyKnow(String threadName) {
+        return super.isIssueWeAlreadyKnow(threadName)
+                // Paimon with s3
+                || threadName.startsWith("s3a-transfer");
+    }
+
+    @Test
+    public void testFaceCDCSinkPaimonWithS3Filesystem() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelJob("/fake_to_paimon_with_s3.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        Container.ExecResult readResult = 
executeSeaTunnelJob("/paimon_with_s3_to_assert.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
new file mode 100644
index 0000000000..a379a638eb
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, "A", 100]
+      },
+      {
+        kind = UPDATE_AFTER
+        fields = [1, "A_1", 100]
+      },
+      {
+        kind = DELETE
+        fields = [2, "B", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "s3a://test/"
+    database = "seatunnel_namespace11"
+    table = "st_test"
+    paimon.hadoop.conf = {
+        fs.s3a.access-key=minio
+        fs.s3a.secret-key=miniominio
+        fs.s3a.endpoint="http://minio:9000";
+        fs.s3a.path.style.access=true
+        
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
new file mode 100644
index 0000000000..6684b5fa95
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "s3a://test/"
+    database = "seatunnel_namespace11"
+    table = "st_test"
+    paimon.hadoop.conf = {
+        fs.s3a.access-key=minio
+        fs.s3a.secret-key=miniominio
+        fs.s3a.endpoint="http://minio:9000";
+        fs.s3a.path.style.access=true
+        
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+    }
+  }
+}
+
+sink {
+ Assert {
+    rules {
+        row_rules = [
+            {
+              rule_type = MIN_ROW
+              rule_value = 2
+            }
+          ],
+          row_rules = [
+            {
+              rule_type = MAX_ROW
+              rule_value = 2
+            }
+          ],
+          field_rules = [
+            {
+              field_name = pk_id
+              field_type = bigint
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                },
+                {
+                  rule_type = MIN
+                  rule_value = 1
+                },
+                {
+                  rule_type = MAX
+                  rule_value = 3
+                }
+              ]
+            },
+            {
+              field_name = name
+              field_type = string
+              field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+              ]
+            },
+             {
+               field_name = score
+               field_type = int
+               field_value = [
+                 {
+                   rule_type = NOT_NULL
+                   equals_to = 100
+                 }
+               ]
+             }
+          ]
+        }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 5fa5abb7ed..b9ff54b6c3 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -439,7 +439,7 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
     }
 
     /** The thread should be recycled but not, we should fix it in the future. 
*/
-    private boolean isIssueWeAlreadyKnow(String threadName) {
+    protected boolean isIssueWeAlreadyKnow(String threadName) {
         // ClickHouse com.clickhouse.client.ClickHouseClientBuilder
         return threadName.startsWith("ClickHouseClientWorker")
                 // InfluxDB okio.AsyncTimeout$Watchdog

Reply via email to