This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 00a7014c96 [Improve][Connector-V2][Iceberg] Add hadoop s3 catalog e2e 
testcase (#5745)
00a7014c96 is described below

commit 00a7014c961d01eb5dc6a5f21487e4f64f5e518c
Author: halo.kim <4567493+4chi...@users.noreply.github.com>
AuthorDate: Sat Jun 15 10:59:13 2024 +0900

    [Improve][Connector-V2][Iceberg] Add hadoop s3 catalog e2e testcase (#5745)
---
 docs/en/connector-v2/source/Iceberg.md             |  26 +-
 .../connector-iceberg-s3-e2e/pom.xml               |  86 ++++++
 .../e2e/connector/iceberg/s3/IcebergSourceIT.java  | 331 +++++++++++++++++++++
 .../src/test/resources/iceberg/iceberg_source.conf | 104 +++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 .../container/seatunnel/SeaTunnelContainer.java    |   6 +-
 6 files changed, 552 insertions(+), 2 deletions(-)

diff --git a/docs/en/connector-v2/source/Iceberg.md 
b/docs/en/connector-v2/source/Iceberg.md
index 8fb296467a..d5281016cf 100644
--- a/docs/en/connector-v2/source/Iceberg.md
+++ b/docs/en/connector-v2/source/Iceberg.md
@@ -41,7 +41,7 @@ Source connector for Apache Iceberg. It can support batch and 
stream mode.
 
 ## Database Dependency
 
-> In order to be compatible with different versions of Hadoop and Hive, the 
scope of hive-exec in the project pom file are provided, so if you use the 
Flink engine, first you may need to add the following Jar packages to 
<FLINK_HOME>/lib directory, if you are using the Spark engine and integrated 
with Hadoop, then you do not need to add the following Jar packages.
+> In order to be compatible with different versions of Hadoop and Hive, the 
scope of hive-exec in the project pom file are provided, so if you use the 
Flink engine, first you may need to add the following Jar packages to 
<FLINK_HOME>/lib directory, if you are using the Spark engine and integrated 
with Hadoop, then you do not need to add the following Jar packages. If you are 
using the hadoop s3 catalog, you need to add the hadoop-aws,aws-java-sdk jars 
for your Flink and Spark engine vers [...]
 
 ```
 hive-exec-xxx.jar
@@ -141,6 +141,30 @@ sink {
 }
 ```
 
+### Hadoop S3 Catalog:
+
+```hocon
+source {
+  iceberg {
+    catalog_name = "seatunnel"
+    iceberg.catalog.config={
+      "type"="hadoop"
+      "warehouse"="s3a://your_bucket/spark/warehouse/"
+    }
+    hadoop.config={
+      "fs.s3a.aws.credentials.provider" = 
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
+      "fs.s3a.endpoint" = "s3.cn-north-1.amazonaws.com.cn"
+      "fs.s3a.access.key" = "xxxxxxxxxxxxxxxxx"
+      "fs.s3a.secret.key" = "xxxxxxxxxxxxxxxxx"
+      "fs.defaultFS" = "s3a://your_bucket"
+    }
+    namespace = "your_iceberg_database"
+    table = "your_iceberg_table"
+    result_table_name = "iceberg_test"
+  }
+}
+```
+
 ### Hive Catalog:
 
 ```hocon
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/pom.xml
new file mode 100644
index 0000000000..a44c8d630f
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-iceberg-s3-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Iceberg : S3</name>
+
+    <properties>
+        <testcontainer.version>1.19.1</testcontainer.version>
+        <minio.version>8.5.6</minio.version>
+        <hadoop3.version>3.1.4</hadoop3.version>
+    </properties>
+
+    <dependencies>
+        <!-- minio containers -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>minio</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.minio</groupId>
+            <artifactId>minio</artifactId>
+            <version>${minio.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- connector -->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-iceberg</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop3.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-aws</artifactId>
+            <version>${hadoop3.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/s3/IcebergSourceIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/s3/IcebergSourceIT.java
new file mode 100644
index 0000000000..a6155bd68b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/s3/IcebergSourceIT.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.iceberg.s3;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.types.Types;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MinIOContainer;
+
+import io.minio.BucketExistsArgs;
+import io.minio.MakeBucketArgs;
+import io.minio.MinioClient;
+import io.minio.UploadObjectArgs;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
+
+@DisabledOnContainer(
+        value = {TestContainerId.SPARK_2_4},
+        type = {EngineType.FLINK, EngineType.SEATUNNEL},
+        disabledReason =
+                "Needs hadoop-aws,aws-java-sdk jar for flink, spark2.4. For 
the seatunnel engine, it crashes on seatunnel-hadoop3-3.1.4-uber.jar.")
+@Slf4j
+public class IcebergSourceIT extends TestSuiteBase implements TestResource {
+
+    public static final String HADOOP_AWS_DOWNLOAD =
+            
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.4/hadoop-aws-3.1.4.jar";;
+    public static final String AWS_SDK_DOWNLOAD =
+            
"https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar";;
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Iceberg/lib 
&& cd /tmp/seatunnel/plugins/Iceberg/lib && curl -O "
+                                        + HADOOP_AWS_DOWNLOAD);
+                Assertions.assertEquals(0, extraCommands.getExitCode());
+
+                extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "cd /tmp/seatunnel/plugins/Iceberg/lib && curl 
-O "
+                                        + AWS_SDK_DOWNLOAD);
+                Assertions.assertEquals(0, extraCommands.getExitCode());
+            };
+
+    private static final String MINIO_DOCKER_IMAGE = "minio/minio";
+    private static final String HOST = "minio";
+    private static final int MINIO_PORT = 9000;
+
+    private static final TableIdentifier TABLE =
+            TableIdentifier.of(Namespace.of("database1"), "source");
+    private static final Schema SCHEMA =
+            new Schema(
+                    Types.NestedField.optional(1, "f1", Types.LongType.get()),
+                    Types.NestedField.optional(2, "f2", 
Types.BooleanType.get()),
+                    Types.NestedField.optional(3, "f3", 
Types.IntegerType.get()),
+                    Types.NestedField.optional(4, "f4", Types.LongType.get()),
+                    Types.NestedField.optional(5, "f5", Types.FloatType.get()),
+                    Types.NestedField.optional(6, "f6", 
Types.DoubleType.get()),
+                    Types.NestedField.optional(7, "f7", Types.DateType.get()),
+                    Types.NestedField.optional(8, "f8", Types.TimeType.get()),
+                    Types.NestedField.optional(9, "f9", 
Types.TimestampType.withZone()),
+                    Types.NestedField.optional(10, "f10", 
Types.TimestampType.withoutZone()),
+                    Types.NestedField.optional(11, "f11", 
Types.StringType.get()),
+                    Types.NestedField.optional(12, "f12", 
Types.FixedType.ofLength(10)),
+                    Types.NestedField.optional(13, "f13", 
Types.BinaryType.get()),
+                    Types.NestedField.optional(14, "f14", 
Types.DecimalType.of(19, 9)),
+                    Types.NestedField.optional(
+                            15, "f15", Types.ListType.ofOptional(100, 
Types.IntegerType.get())),
+                    Types.NestedField.optional(
+                            16,
+                            "f16",
+                            Types.MapType.ofOptional(
+                                    200, 300, Types.StringType.get(), 
Types.IntegerType.get())),
+                    Types.NestedField.optional(
+                            17,
+                            "f17",
+                            Types.StructType.of(
+                                    Types.NestedField.required(
+                                            400, "f17_a", 
Types.StringType.get()))));
+
+    private static final String CATALOG_NAME = "seatunnel";
+    private static final IcebergCatalogType CATALOG_TYPE = HADOOP;
+
+    private static String BUCKET = "test-bucket";
+    private static String REGION = "us-east-1";
+
+    private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/s3/";
+    private static final String WAREHOUSE = "s3a://" + BUCKET + CATALOG_DIR;
+    private static Catalog CATALOG;
+
+    private MinIOContainer container;
+    private MinioClient minioClient;
+    private Configuration configuration;
+
+    @BeforeEach
+    @Override
+    public void startUp() throws Exception {
+        container =
+                new MinIOContainer(MINIO_DOCKER_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(HOST)
+                        .withExposedPorts(MINIO_PORT);
+
+        container.start();
+
+        String s3URL = container.getS3URL();
+
+        // configuringClient
+        minioClient =
+                MinioClient.builder()
+                        .endpoint(s3URL)
+                        .credentials(container.getUserName(), 
container.getPassword())
+                        .region(REGION)
+                        .build();
+
+        // create bucket
+        
minioClient.makeBucket(MakeBucketArgs.builder().bucket(BUCKET).region(REGION).build());
+
+        BucketExistsArgs existsArgs = 
BucketExistsArgs.builder().bucket(BUCKET).build();
+        Assertions.assertTrue(minioClient.bucketExists(existsArgs));
+
+        configuration = initializeConfiguration();
+
+        initializeIcebergTable();
+        batchInsertData();
+    }
+
+    private Configuration initializeConfiguration() {
+        Configuration conf = new Configuration();
+        Map<String, String> hadoopProps = getHadoopProps();
+        hadoopProps.forEach((key, value) -> conf.set(key, value));
+        return conf;
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (container != null) {
+            container.stop();
+        }
+    }
+
+    @TestTemplate
+    public void testIcebergSource(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/iceberg/iceberg_source.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    private void initializeIcebergTable() {
+        Map<String, Object> configs = new HashMap<>();
+
+        // add catalog properties
+        Map<String, Object> catalogProps = new HashMap<>();
+        catalogProps.put("type", CATALOG_TYPE.getType());
+        catalogProps.put("warehouse", WAREHOUSE);
+
+        configs.put(CommonConfig.KEY_CATALOG_NAME.key(), CATALOG_NAME);
+
+        configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps);
+
+        configs.put(CommonConfig.HADOOP_PROPS.key(), getHadoopProps());
+
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configs);
+        CATALOG = new IcebergCatalogLoader(new 
SourceConfig(readonlyConfig)).loadCatalog();
+        if (!CATALOG.tableExists(TABLE)) {
+            CATALOG.createTable(TABLE, SCHEMA);
+        }
+    }
+
+    private Map<String, String> getHadoopProps() {
+        Map<String, String> hadoopProps = new HashMap<>();
+        hadoopProps.put("fs.s3a.path.style.access", "true");
+        hadoopProps.put("fs.s3a.connection.ssl.enabled", "false");
+        hadoopProps.put("fs.s3a.connection.timeout", "3000");
+        hadoopProps.put("fs.s3a.impl.disable.cache", "true");
+        hadoopProps.put("fs.s3a.attempts.maximum", "1");
+        hadoopProps.put(
+                "fs.s3a.aws.credentials.provider",
+                "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+        hadoopProps.put("fs.s3a.endpoint", container.getS3URL());
+        hadoopProps.put("fs.s3a.access.key", container.getUserName());
+        hadoopProps.put("fs.s3a.secret.key", container.getPassword());
+        hadoopProps.put("fs.defaultFS", "s3a://" + BUCKET);
+        return hadoopProps;
+    }
+
+    private void batchInsertData() {
+        GenericRecord record = GenericRecord.create(SCHEMA);
+        record.setField("f1", Long.valueOf(0));
+        record.setField("f2", true);
+        record.setField("f3", Integer.MAX_VALUE);
+        record.setField("f4", Long.MAX_VALUE);
+        record.setField("f5", Float.MAX_VALUE);
+        record.setField("f6", Double.MAX_VALUE);
+        record.setField("f7", LocalDate.now());
+        record.setField("f8", LocalTime.now());
+        record.setField("f9", OffsetDateTime.now());
+        record.setField("f10", LocalDateTime.now());
+        record.setField("f11", "test");
+        record.setField("f12", "abcdefghij".getBytes());
+        record.setField("f13", ByteBuffer.wrap("test".getBytes()));
+        record.setField("f14", new BigDecimal("1000000000.000000001"));
+        record.setField("f15", Arrays.asList(Integer.MAX_VALUE));
+        record.setField("f16", Collections.singletonMap("key", 
Integer.MAX_VALUE));
+        Record structRecord = 
GenericRecord.create(SCHEMA.findField("f17").type().asStructType());
+        structRecord.setField("f17_a", "test");
+        record.setField("f17", structRecord);
+
+        Table table = CATALOG.loadTable(TABLE);
+        FileAppenderFactory appenderFactory = new 
GenericAppenderFactory(SCHEMA);
+        List<Record> records = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            records.add(record.copy("f1", Long.valueOf(i)));
+            if (i % 10 == 0) {
+                String externalFilePath =
+                        String.format(CATALOG_DIR + 
"external_file/datafile_%s.avro", i);
+                FileAppender<Record> fileAppender =
+                        appenderFactory.newAppender(
+                                Files.localOutput(externalFilePath),
+                                FileFormat.fromFileName(externalFilePath));
+                try (FileAppender<Record> fileAppenderCloseable = 
fileAppender) {
+                    fileAppenderCloseable.addAll(records);
+                    records.clear();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+
+                uploadObject(externalFilePath);
+
+                HadoopInputFile inputFile =
+                        
HadoopInputFile.fromLocation(getS3Output(externalFilePath), configuration);
+                Assertions.assertTrue(inputFile.exists());
+
+                DataFile datafile =
+                        DataFiles.builder(PartitionSpec.unpartitioned())
+                                .withInputFile(inputFile)
+                                .withMetrics(fileAppender.metrics())
+                                .build();
+                table.newAppend().appendFile(datafile).commit();
+            }
+        }
+    }
+
+    private String getS3Output(String externalFilePath) {
+        return "s3a://" + BUCKET + externalFilePath;
+    }
+
+    private void uploadObject(String externalFilePath) {
+        try {
+            minioClient.uploadObject(
+                    UploadObjectArgs.builder()
+                            .bucket(BUCKET)
+                            .object(externalFilePath)
+                            .filename(externalFilePath)
+                            .build());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/resources/iceberg/iceberg_source.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/resources/iceberg/iceberg_source.conf
new file mode 100644
index 0000000000..b2ab81a56c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-s3-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  Iceberg {
+    schema {
+      fields {
+        f2 = "boolean"
+        f1 = "bigint"
+        f3 = "int"
+        f4 = "bigint"
+        f5 = "float"
+        f6 = "double"
+        f7 = "date"
+        f9 = "timestamp"
+        f10 = "timestamp"
+        f11 = "string"
+        f12 = "bytes"
+        f13 = "bytes"
+        f14 = "decimal(19,9)"
+        f15 = "array<int>"
+        f16 = "map<string, int>"
+      }
+    }
+    catalog_name = "seatunnel"
+    iceberg.catalog.config={
+      "type"="hadoop"
+      "warehouse"="s3a://test-bucket/tmp/seatunnel/iceberg/s3/"
+    }
+    hadoop.config={
+      "fs.s3a.path.style.access" = "true"
+      "fs.s3a.connection.ssl.enabled" = "false"
+      "fs.s3a.signing.algorithm" = "S3SignerType"
+      "fs.s3a.encryption.algorithm" = "AES256"
+      "fs.s3a.connection.timeout" = "3000"
+      "fs.s3a.impl.disable.cache" = "true"
+      "fs.s3a.attempts.maximum" = "1"
+      "fs.s3a.aws.credentials.provider" = 
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
+      "fs.s3a.endpoint" = "http://minio:9000";
+      "fs.s3a.access.key" = "minioadmin"
+      "fs.s3a.secret.key" = "minioadmin"
+      "fs.defaultFS" = "s3a://test-bucket"
+    }
+    namespace = "database1"
+    table = "source"
+    result_table_name = "iceberg"
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    source_table_name = "iceberg"
+    rules = {
+      field_rules = [
+        {
+          field_name = f1
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            },
+            {
+              rule_type = MIN
+              rule_value = 0
+            },
+            {
+              rule_type = MAX
+              rule_value = 99
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 455f9e7659..9e236b7439 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -54,6 +54,7 @@
         <module>connector-cdc-mongodb-e2e</module>
         <module>connector-iceberg-e2e</module>
         <module>connector-iceberg-hadoop3-e2e</module>
+        <module>connector-iceberg-s3-e2e</module>
         <module>connector-tdengine-e2e</module>
         <module>connector-datahub-e2e</module>
         <module>connector-mongodb-e2e</module>
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index f9a139b040..9057518063 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -426,7 +426,11 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                 || threadName.startsWith("MaintenanceTimer")
                 || threadName.startsWith("cluster-")
                 // Iceberg
-                || threadName.startsWith("iceberg");
+                || threadName.startsWith("iceberg")
+                // Iceberg S3 Hadoop catalog
+                || threadName.contains("java-sdk-http-connection-reaper")
+                || threadName.contains("Timer for 's3a-file-system' metrics 
system")
+                || threadName.startsWith("MutableQuantiles-");
     }
 
     @Override

Reply via email to