This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e01108a0ef [Fix][paimon-e2e] Optimize Paimon E2E Cases (#9612)
e01108a0ef is described below
commit e01108a0ef129e7ebb49ed031d85966773713c9b
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Jul 24 19:45:36 2025 +0800
[Fix][paimon-e2e] Optimize Paimon E2E Cases (#9612)
---
.../e2e/connector/paimon/AbstractPaimonIT.java | 85 +----------
.../seatunnel/e2e/connector/paimon/PaimonIT.java | 31 ++--
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 50 ++----
.../paimon/PaimonSinkDynamicBucketIT.java | 126 +++------------
.../paimon/PaimonSinkWithSchemaEvolutionIT.java | 4 -
.../e2e/connector/paimon/PaimonStreamReadIT.java | 18 ++-
.../changelog_fake_cdc_sink_paimon_case1_ddl.conf | 3 +-
...log_fake_cdc_sink_paimon_case1_insert_data.conf | 3 +-
...log_fake_cdc_sink_paimon_case1_update_data.conf | 3 +-
.../changelog_fake_cdc_sink_paimon_case2.conf | 3 +-
.../test/resources/changelog_paimon_to_paimon.conf | 7 +-
.../test/resources/fake_cdc_sink_paimon_case1.conf | 2 +-
.../resources/fake_cdc_sink_paimon_case10.conf | 2 +-
...ke_cdc_sink_paimon_case1_with_error_schema.conf | 2 +-
.../test/resources/fake_cdc_sink_paimon_case2.conf | 128 ++++++++--------
.../test/resources/fake_cdc_sink_paimon_case3.conf | 4 +-
.../test/resources/fake_cdc_sink_paimon_case4.conf | 4 +-
.../test/resources/fake_cdc_sink_paimon_case5.conf | 4 +-
.../test/resources/fake_cdc_sink_paimon_case6.conf | 4 +-
.../test/resources/fake_cdc_sink_paimon_case7.conf | 82 +++++-----
.../test/resources/fake_cdc_sink_paimon_case8.conf | 38 ++---
.../test/resources/fake_cdc_sink_paimon_case9.conf | 2 +-
.../fake_cdc_sink_paimon_with_hdfs_ha.conf | 8 +-
...dc_sink_paimon_with_hdfs_with_hive_catalog.conf | 12 +-
.../fake_cdc_to_dynamic_bucket_paimon_case.conf | 88 +++++------
.../fake_sink_paimon_truncate_with_hdfs_case2.conf | 2 +-
.../fake_sink_paimon_truncate_with_hive_case1.conf | 4 +-
.../fake_sink_paimon_truncate_with_hive_case2.conf | 6 +-
...fake_sink_paimon_truncate_with_local_case1.conf | 2 +-
...fake_sink_paimon_truncate_with_local_case2.conf | 4 +-
.../fake_to_dynamic_bucket_paimon_case1.conf | 12 +-
.../fake_to_dynamic_bucket_paimon_case2.conf | 8 +-
.../fake_to_dynamic_bucket_paimon_case3.conf | 8 +-
.../fake_to_dynamic_bucket_paimon_case4.conf | 14 +-
.../fake_to_dynamic_bucket_paimon_case5.conf | 10 +-
.../fake_to_dynamic_bucket_paimon_case6.conf | 8 +-
.../fake_to_dynamic_bucket_paimon_case7.conf | 4 +-
.../src/test/resources/fake_to_paimon.conf | 9 +-
.../resources/fake_to_paimon_with_full_type.conf | 3 +-
.../fake_to_paimon_with_full_type_cdc_data.conf | 2 +-
.../src/test/resources/fake_to_paimon_with_s3.conf | 10 +-
.../fake_to_paimon_with_s3_with_checkpoint.conf | 10 +-
.../mysql_cdc_to_paimon_with_schema_change.conf | 6 +-
.../resources/paimon_projection_to_assert.conf | 10 +-
.../src/test/resources/paimon_to_assert.conf | 10 +-
.../resources/paimon_to_assert_with_filter1.conf | 2 +-
.../resources/paimon_to_assert_with_filter2.conf | 2 +-
.../resources/paimon_to_assert_with_filter3.conf | 34 ++---
.../resources/paimon_to_assert_with_filter4.conf | 34 ++---
.../resources/paimon_to_assert_with_filter5.conf | 42 ++---
.../resources/paimon_to_assert_with_filter6.conf | 42 ++---
.../resources/paimon_to_assert_with_filter7.conf | 42 ++---
.../resources/paimon_to_assert_with_filter8.conf | 2 +-
.../paimon_to_assert_with_hivecatalog.conf | 12 +-
.../paimon_to_assert_with_timestampN.conf | 2 +-
.../src/test/resources/paimon_to_paimon.conf | 4 +-
.../test/resources/paimon_with_s3_to_assert.conf | 100 ++++++------
.../read_from_paimon_with_hdfs_ha_to_assert.conf | 8 +-
.../src/test/resources/schema-0.json | 170 ++++++++++++---------
.../common/container/AbstractTestContainer.java | 15 ++
.../flink/AbstractTestFlinkContainer.java | 21 ++-
.../container/seatunnel/SeaTunnelContainer.java | 9 ++
.../spark/AbstractTestSparkContainer.java | 10 ++
63 files changed, 655 insertions(+), 751 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
index d5dd8608a5..ea2cf3bfc1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
@@ -17,12 +17,9 @@
package org.apache.seatunnel.e2e.connector.paimon;
-import org.apache.seatunnel.common.utils.FileUtils;
-import org.apache.seatunnel.core.starter.utils.CompressionUtils;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
-import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -32,86 +29,18 @@ import org.apache.paimon.table.Table;
import lombok.extern.slf4j.Slf4j;
-import java.io.File;
-import java.io.IOException;
-
@Slf4j
public abstract class AbstractPaimonIT extends TestSuiteBase {
- protected static String CATALOG_ROOT_DIR = "/tmp/";
protected static final String NAMESPACE = "paimon";
- protected static final String NAMESPACE_TAR = "paimon.tar.gz";
- protected static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE +
"/";
protected static final String TARGET_TABLE = "st_test";
protected static final String FAKE_TABLE1 = "FakeTable1";
protected static final String FAKE_DATABASE1 = "FakeDatabase1";
protected static final String FAKE_TABLE2 = "FakeTable1";
protected static final String FAKE_DATABASE2 = "FakeDatabase2";
- private final String CATALOG_ROOT_DIR_WIN =
- "C:/Users/" + System.getProperty("user.name") + "/tmp/";
- private final String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE +
"/";
protected boolean isWindows;
protected boolean changeLogEnabled = false;
- protected final ContainerExtendedFactory containerExtendedFactory =
- container -> {
- if (isWindows) {
- FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR);
- FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + "paimon.tar");
- FileUtils.createNewDir(CATALOG_ROOT_DIR_WIN);
- } else {
- FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
- FileUtils.createNewDir(CATALOG_DIR);
- }
-
- container.execInContainer(
- "sh",
- "-c",
- "cd "
- + CATALOG_ROOT_DIR
- + " && tar -czvf "
- + NAMESPACE_TAR
- + " "
- + NAMESPACE);
- container.copyFileFromContainer(
- CATALOG_ROOT_DIR + NAMESPACE_TAR,
- (isWindows ? CATALOG_ROOT_DIR_WIN : CATALOG_ROOT_DIR)
+ NAMESPACE_TAR);
- if (isWindows) {
- extractFilesWin();
- } else {
- extractFiles();
- }
- };
-
- private void extractFiles() {
- ProcessBuilder processBuilder = new ProcessBuilder();
- processBuilder.command(
- "sh", "-c", "cd " + CATALOG_ROOT_DIR + " && tar -zxvf " +
NAMESPACE_TAR);
- try {
- Process process = processBuilder.start();
- // wait command completed
- int exitCode = process.waitFor();
- if (exitCode == 0) {
- log.info("Extract files successful.");
- } else {
- log.error("Extract files failed with exit code " + exitCode);
- }
- } catch (IOException | InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- private void extractFilesWin() {
- try {
- CompressionUtils.unGzip(
- new File(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR), new
File(CATALOG_ROOT_DIR_WIN));
- CompressionUtils.unTar(
- new File(CATALOG_ROOT_DIR_WIN + "paimon.tar"), new
File(CATALOG_ROOT_DIR_WIN));
- } catch (IOException | ArchiveException e) {
- throw new RuntimeException(e);
- }
- }
-
protected Table getTable(String dbName, String tbName) {
try {
return getCatalog().getTable(getIdentifier(dbName, tbName));
@@ -127,11 +56,13 @@ public abstract class AbstractPaimonIT extends
TestSuiteBase {
private Catalog getCatalog() {
Options options = new Options();
- if (isWindows) {
- options.set("warehouse", CATALOG_DIR_WIN);
- } else {
- options.set("warehouse", "file://" + CATALOG_DIR);
- }
+ String warehouse =
+ String.format(
+ "%s%s/%s",
+ isWindows ? "" : "file://",
+ AbstractTestContainer.HOST_VOLUME_MOUNT_PATH,
+ NAMESPACE);
+ options.set("warehouse", warehouse);
return CatalogFactory.createCatalog(CatalogContext.create(options));
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
index 952ff106a1..7ea21f3020 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.e2e.connector.paimon;
+import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
@@ -25,12 +26,7 @@ import
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
-
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
@@ -42,7 +38,7 @@ import java.nio.file.Path;
@DisabledOnContainer(
value = TestContainerId.FLINK_1_13,
disabledReason = "Paimon does not support flink 1.13")
-public class PaimonIT extends TestSuiteBase {
+public class PaimonIT extends TestSuiteBase implements TestResource {
@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
@@ -50,8 +46,8 @@ public class PaimonIT extends TestSuiteBase {
Path schemaPath =
ContainerUtil.getResourcesFile("/schema-0.json").toPath();
container.copyFileToContainer(
MountableFile.forHostPath(schemaPath),
-
"/opt/seatunnel_mounts/paimon/default.db/st_test/schema/schema-0");
- container.execInContainer("chmod", "777", "-R",
"/opt/seatunnel_mounts/paimon");
+
"/tmp/seatunnel_mnt/paimon/default.db/st_test/schema/schema-0");
+ container.execInContainer("chmod", "777", "-R",
"/tmp/seatunnel_mnt/");
};
@TestTemplate
@@ -64,17 +60,12 @@ public class PaimonIT extends TestSuiteBase {
Container.ExecResult readProjectionResult =
container.executeJob("/paimon_projection_to_assert.conf");
Assertions.assertEquals(0, readProjectionResult.getExitCode());
- deleteTable();
}
- private void deleteTable() {
- Options options = new Options();
- options.set("warehouse", "file://" + "/opt/seatunnel_mounts/paimon");
- try {
- CatalogFactory.createCatalog(CatalogContext.create(options))
- .dropTable(Identifier.create("default", "st_test"), true);
- } catch (Catalog.TableNotExistException e) {
- throw new RuntimeException(e);
- }
- }
+ @Override
+ public void startUp() throws Exception {}
+
+ @Override
+ @AfterEach
+ public void tearDown() throws Exception {}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index d4f5bb5f9d..3def9a3497 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -38,9 +38,9 @@ import org.apache.paimon.types.DateType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.DateTimeUtils;
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
@@ -64,14 +64,14 @@ import static org.awaitility.Awaitility.given;
@Slf4j
public class PaimonSinkCDCIT extends AbstractPaimonIT implements TestResource {
- @BeforeAll
+ @BeforeEach
@Override
public void startUp() throws Exception {
this.isWindows =
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
}
- @AfterAll
+ @AfterEach
@Override
public void tearDown() throws Exception {}
@@ -91,8 +91,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
List<PaimonRecord> paimonRecords =
loadPaimonData("seatunnel_namespace9",
TARGET_TABLE);
Assertions.assertEquals(3, paimonRecords.size());
@@ -120,8 +118,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
List<PaimonRecord> paimonRecords =
loadPaimonData("seatunnel_namespace1",
TARGET_TABLE);
Assertions.assertEquals(2, paimonRecords.size());
@@ -162,8 +158,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
// Check FakeDatabase1.FakeTable1
List<PaimonRecord> fake1PaimonRecords =
loadPaimonData(FAKE_DATABASE1,
FAKE_TABLE1);
@@ -205,8 +199,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
Table table = getTable("seatunnel_namespace3",
TARGET_TABLE);
String bucket =
table.options().get(CoreOptions.BUCKET.key());
Assertions.assertTrue(StringUtils.isNoneBlank(bucket));
@@ -237,8 +229,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
Table table = getTable("seatunnel_namespace4",
TARGET_TABLE);
List<String> partitionKeys = table.partitionKeys();
List<String> primaryKeys = table.primaryKeys();
@@ -290,8 +280,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
Table table = getTable("seatunnel_namespace5",
TARGET_TABLE);
String fileFormat =
table.options().get(CoreOptions.FILE_FORMAT.key());
Assertions.assertTrue(StringUtils.isNoneBlank(fileFormat));
@@ -322,8 +310,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
Table table = getTable("seatunnel_namespace6",
TARGET_TABLE);
String fileFormat =
table.options().get(CoreOptions.FILE_FORMAT.key());
Assertions.assertTrue(StringUtils.isNoneBlank(fileFormat));
@@ -355,8 +341,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
FileStoreTable table =
(FileStoreTable)
getTable("seatunnel_namespace7", TARGET_TABLE);
List<DataField> fields = table.schema().fields();
@@ -397,15 +381,15 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
Assertions.assertEquals(2, result.size());
for (PaimonRecord paimonRecord : result) {
Assertions.assertEquals(
- paimonRecord.oneTime.toString(),
"2024-03-10T10:00:12");
+ "2024-03-10T10:00:12",
paimonRecord.oneTime.toString());
Assertions.assertEquals(
- paimonRecord.twoTime.toString(),
"2024-03-10T10:00:00.123");
+ "2024-03-10T10:00:00.123",
paimonRecord.twoTime.toString());
Assertions.assertEquals(
- paimonRecord.threeTime.toString(),
- "2024-03-10T10:00:00.123456");
+ "2024-03-10T10:00:00.123456",
+ paimonRecord.threeTime.toString());
Assertions.assertEquals(
- paimonRecord.fourTime.toString(),
- "2024-03-10T10:00:00.123456789");
+ "2024-03-10T10:00:00.123456789",
+ paimonRecord.fourTime.toString());
}
});
@@ -425,8 +409,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
FileStoreTable table =
(FileStoreTable)
getTable("seatunnel_namespace8", TARGET_TABLE);
List<DataField> fields = table.schema().fields();
@@ -511,8 +493,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
List<PaimonRecord> paimonRecords =
loadPaimonData("seatunnel_namespace10",
TARGET_TABLE);
Assertions.assertEquals(2, paimonRecords.size());
@@ -540,7 +520,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
Container.ExecResult writeResult =
container.executeJob("/changelog_fake_cdc_sink_paimon_case1_ddl.conf");
Assertions.assertEquals(0, writeResult.getExitCode());
- TimeUnit.SECONDS.sleep(120);
String[] jobIds =
new String[] {
String.valueOf(JobIdGenerator.newJobId()),
@@ -585,16 +564,13 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
throw new SeaTunnelException(e);
}
}));
- // stream job running 30 seconds
- TimeUnit.SECONDS.sleep(120);
+ // stream job running 60 seconds
+ TimeUnit.SECONDS.sleep(60);
// cancel stream job
container.cancelJob(jobIds[1]);
container.cancelJob(jobIds[2]);
container.cancelJob(jobIds[0]);
changeLogEnabled = true;
- TimeUnit.SECONDS.sleep(10);
- // copy paimon to local
- container.executeExtraCommands(containerExtendedFactory);
List<PaimonRecord> paimonRecords1 =
loadPaimonData("seatunnel_namespace", "st_test_sink");
List<String> actual1 =
paimonRecords1.stream()
@@ -646,8 +622,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
// cancel stream job
container.cancelJob(String.valueOf(jobId));
TimeUnit.SECONDS.sleep(5);
- // copy paimon to local
- container.executeExtraCommands(containerExtendedFactory);
List<PaimonRecord> paimonRecords =
loadPaimonData("seatunnel_namespace", "st_test_full");
List<String> actual =
paimonRecords.stream()
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
index b44da835ee..4b74575da8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
@@ -18,19 +18,15 @@
package org.apache.seatunnel.e2e.connector.paimon;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogLoader;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
-import org.apache.seatunnel.core.starter.utils.CompressionUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
-import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -50,16 +46,15 @@ import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.TimestampType;
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import lombok.extern.slf4j.Slf4j;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -70,6 +65,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.e2e.common.container.AbstractTestContainer.HOST_VOLUME_MOUNT_PATH;
import static org.awaitility.Awaitility.given;
@DisabledOnContainer(
@@ -80,23 +76,16 @@ import static org.awaitility.Awaitility.given;
@Slf4j
public class PaimonSinkDynamicBucketIT extends TestSuiteBase implements
TestResource {
- private static String CATALOG_ROOT_DIR = "/tmp/";
- private static final String NAMESPACE = "paimon";
- private static final String NAMESPACE_TAR = "paimon.tar.gz";
- private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE +
"/";
- private String CATALOG_ROOT_DIR_WIN = "C:/Users/";
- private String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
private boolean isWindows;
+ private static final String NAMESPACE = "paimon";
private Map<String, Object> PAIMON_SINK_PROPERTIES;
- @BeforeAll
+ @BeforeEach
@Override
public void startUp() throws Exception {
this.isWindows =
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
- CATALOG_ROOT_DIR_WIN = CATALOG_ROOT_DIR_WIN +
System.getProperty("user.name") + "/tmp/";
- CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
Map<String, Object> map = new HashMap<>();
map.put("warehouse", "hdfs:///tmp/paimon");
map.put("database", "default");
@@ -115,7 +104,7 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
this.PAIMON_SINK_PROPERTIES = map;
}
- @AfterAll
+ @AfterEach
@Override
public void tearDown() throws Exception {}
@@ -130,18 +119,6 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
Container.ExecResult readProjectionResult =
container.executeJob("/paimon_projection_to_assert.conf");
Assertions.assertEquals(0, readProjectionResult.getExitCode());
- deleteTable();
- }
-
- private void deleteTable() {
- Options options = new Options();
- options.set("warehouse", "file://" + "/opt/seatunnel_mounts/paimon");
- try {
- CatalogFactory.createCatalog(CatalogContext.create(options))
- .dropTable(Identifier.create("default", "st_test"), true);
- } catch (Catalog.TableNotExistException e) {
- throw new RuntimeException(e);
- }
}
@TestTemplate
@@ -155,8 +132,6 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
FileStoreTable table =
(FileStoreTable) getTable("default",
"st_test_2");
IndexBootstrap indexBootstrap = new
IndexBootstrap(table);
@@ -225,8 +200,6 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
FileStoreTable table =
(FileStoreTable) getTable("default",
"st_test_3");
IndexBootstrap indexBootstrap = new
IndexBootstrap(table);
@@ -262,8 +235,6 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
.atMost(120L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
FileStoreTable table =
(FileStoreTable) getTable("default",
"st_test_4");
IndexBootstrap indexBootstrap = new
IndexBootstrap(table);
@@ -303,10 +274,8 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
FileStoreTable table =
- (FileStoreTable) getTable("default",
"st_test_3");
+ (FileStoreTable) getTable("default",
"st_test_cdc_write");
List<DataField> fields = table.schema().fields();
for (DataField field : fields) {
if (field.name().equalsIgnoreCase("one_time"))
{
@@ -345,15 +314,15 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
Assertions.assertEquals(2, result.size());
for (PaimonRecord paimonRecord : result) {
Assertions.assertEquals(
- paimonRecord.oneTime.toString(),
"2024-03-10T10:00:12");
+ "2024-03-10T10:00:12",
paimonRecord.oneTime.toString());
Assertions.assertEquals(
- paimonRecord.twoTime.toString(),
"2024-03-10T10:00:00.123");
+ "2024-03-10T10:00:00.123",
paimonRecord.twoTime.toString());
Assertions.assertEquals(
- paimonRecord.threeTime.toString(),
- "2024-03-10T10:00:00.123456");
+ "2024-03-10T10:00:00.123456",
+ paimonRecord.threeTime.toString());
Assertions.assertEquals(
- paimonRecord.fourTime.toString(),
- "2024-03-10T10:00:00.123456789");
+ "2024-03-10T10:00:00.123456789",
+ paimonRecord.fourTime.toString());
}
});
}
@@ -371,7 +340,6 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
.atMost(60L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
-
container.executeExtraCommands(containerExtendedFactory);
FileStoreTable table =
(FileStoreTable) getTable("full_type",
"st_test");
List<String> primaryKeys =
table.schema().primaryKeys();
@@ -386,72 +354,12 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
Assertions.assertEquals(0, writeResult1.getExitCode());
}
- protected final ContainerExtendedFactory containerExtendedFactory =
- container -> {
- if (isWindows) {
- FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR);
- FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + "paimon.tar");
- FileUtils.createNewDir(CATALOG_ROOT_DIR_WIN);
- } else {
- FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
- FileUtils.createNewDir(CATALOG_DIR);
- }
-
- container.execInContainer(
- "sh",
- "-c",
- "cd "
- + CATALOG_ROOT_DIR
- + " && tar -czvf "
- + NAMESPACE_TAR
- + " "
- + NAMESPACE);
- container.copyFileFromContainer(
- CATALOG_ROOT_DIR + NAMESPACE_TAR,
- (isWindows ? CATALOG_ROOT_DIR_WIN : CATALOG_ROOT_DIR)
+ NAMESPACE_TAR);
- if (isWindows) {
- extractFilesWin();
- } else {
- extractFiles();
- }
- };
-
- private void extractFiles() {
- ProcessBuilder processBuilder = new ProcessBuilder();
- processBuilder.command(
- "sh", "-c", "cd " + CATALOG_ROOT_DIR + " && tar -zxvf " +
NAMESPACE_TAR);
- try {
- Process process = processBuilder.start();
- // wait command completed
- int exitCode = process.waitFor();
- if (exitCode == 0) {
- log.info("Extract files successful.");
- } else {
- log.error("Extract files failed with exit code " + exitCode);
- }
- } catch (IOException | InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- private void extractFilesWin() {
- try {
- CompressionUtils.unGzip(
- new File(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR), new
File(CATALOG_ROOT_DIR_WIN));
- CompressionUtils.unTar(
- new File(CATALOG_ROOT_DIR_WIN + "paimon.tar"), new
File(CATALOG_ROOT_DIR_WIN));
- } catch (IOException | ArchiveException e) {
- throw new RuntimeException(e);
- }
- }
-
protected Table getTable(String dbName, String tbName) {
Options options = new Options();
- if (isWindows) {
- options.set("warehouse", CATALOG_DIR_WIN);
- } else {
- options.set("warehouse", "file://" + CATALOG_DIR);
- }
+ String warehouse =
+ String.format(
+ "%s%s/%s", isWindows ? "" : "file://",
HOST_VOLUME_MOUNT_PATH, NAMESPACE);
+ options.set("warehouse", warehouse);
try {
Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(options));
return catalog.getTable(Identifier.create(dbName, tbName));
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
index 5c85b29863..3a81eb5196 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
@@ -170,8 +170,6 @@ public class PaimonSinkWithSchemaEvolutionIT extends
AbstractPaimonIT implements
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
Assertions.assertIterableEquals(
queryMysql(String.format(QUERY,
MYSQL_DATABASE, SOURCE_TABLE)),
queryPaimon(null, 0, Integer.MAX_VALUE));
@@ -335,8 +333,6 @@ public class PaimonSinkWithSchemaEvolutionIT extends
AbstractPaimonIT implements
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(
() -> {
- // copy paimon to local
-
container.executeExtraCommands(containerExtendedFactory);
// 1. Vertify the schema
vertifySchema();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
index 1217ab4d4a..985029a792 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.e2e.connector.paimon;
import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
@@ -31,7 +32,9 @@ import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
@@ -52,7 +55,7 @@ import static org.awaitility.Awaitility.given;
disabledReason =
"Spark and Flink engine can not auto create paimon table on
worker node in local file(e.g flink tm) by savemode feature which can lead
error")
@Slf4j
-public class PaimonStreamReadIT extends PaimonSinkCDCIT {
+public class PaimonStreamReadIT extends AbstractPaimonIT implements
TestResource {
@TestTemplate
public void testStreamReadPaimon(TestContainer container) throws Exception
{
@@ -75,7 +78,6 @@ public class PaimonStreamReadIT extends PaimonSinkCDCIT {
.atMost(400L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
-
container.executeExtraCommands(containerExtendedFactory);
List<PaimonRecordWithFullType> paimonSourceRecords
=
loadPaimonDataWithFullType("full_type",
"st_test");
List<PaimonRecordWithFullType> paimonSinkRecords =
@@ -96,7 +98,6 @@ public class PaimonStreamReadIT extends PaimonSinkCDCIT {
.atMost(400L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
-
container.executeExtraCommands(containerExtendedFactory);
List<PaimonRecordWithFullType> paimonSourceRecords
=
loadPaimonDataWithFullType("full_type",
"st_test");
List<PaimonRecordWithFullType> paimonSinkRecords =
@@ -150,4 +151,15 @@ public class PaimonStreamReadIT extends PaimonSinkCDCIT {
}
return result;
}
+
+ @Override
+ @BeforeEach
+ public void startUp() throws Exception {
+ this.isWindows =
+
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
+ }
+
+ @Override
+ @AfterEach
+ public void tearDown() throws Exception {}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
index 6a32727505..4d4b30d7ac 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_ddl.conf
@@ -42,12 +42,13 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace"
table = "st_test_lookup"
paimon.table.write-props = {
changelog-producer = lookup
changelog-tmp-path = "/tmp/paimon/changelog"
+ file-format = parquet
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
index 9b7310177c..3ff070d2d5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_insert_data.conf
@@ -56,12 +56,13 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace"
table = "st_test_lookup"
paimon.table.write-props = {
changelog-producer = lookup
changelog-tmp-path = "/tmp/paimon/changelog"
+ file-format = parquet
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
index 271ad20bff..209b2a7e41 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case1_update_data.conf
@@ -60,12 +60,13 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace"
table = "st_test_lookup"
paimon.table.write-props = {
changelog-producer = lookup
changelog-tmp-path = "/tmp/paimon/changelog"
+ file-format = parquet
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
index f7135e645f..e5660abc55 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_fake_cdc_sink_paimon_case2.conf
@@ -72,12 +72,13 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace"
table = "st_test_full"
paimon.table.write-props = {
changelog-producer = full-compaction
changelog-tmp-path = "/tmp/paimon/changelog"
+ file-format = parquet
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
index 7f0798039a..d63810fa19 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/changelog_paimon_to_paimon.conf
@@ -18,12 +18,12 @@
env {
parallelism = 1
job.mode = "Streaming"
- checkpoint.interval = 2000
+ checkpoint.interval = 5000
}
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace"
table = "st_test_lookup"
}
@@ -38,12 +38,13 @@ transform {
sink {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace"
table = "st_test_sink"
paimon.table.non-primary-key = true
paimon.table.write-props = {
write-only = true
+ file-format = parquet
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
index 50ce13aa68..d2a6929cac 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1.conf
@@ -79,7 +79,7 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace1"
table = "st_test"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case10.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case10.conf
index 3c2061c55b..8f1c539ad6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case10.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case10.conf
@@ -51,7 +51,7 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace9"
table = "st_test"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf
index 70bcedff29..3dd8ea1adb 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case1_with_error_schema.conf
@@ -55,7 +55,7 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace1"
table = "st_test"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
index ddc9226871..fcb2044ba8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case2.conf
@@ -26,18 +26,18 @@ env {
source {
FakeSource {
tables_configs = [
- {
+ {
schema = {
- table = "FakeDatabase1.FakeTable1"
- fields {
- pk_id = bigint
- name = string
- score = int
- }
- primaryKey {
- name = "pk_id"
- columnNames = [pk_id]
- }
+ table = "FakeDatabase1.FakeTable1"
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
}
rows = [
{
@@ -77,65 +77,65 @@ source {
fields = [2, "B", 100]
}
]
- },
- {
- schema = {
- table = "FakeDatabase2.FakeTable1"
- fields {
- pk_id = bigint
- name = string
- }
- primaryKey {
- name = "pk_id"
- columnNames = [pk_id]
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [100, "A"]
- },
- {
- kind = INSERT
- fields = [200, "B"]
- },
- {
- kind = INSERT
- fields = [300, "C"]
- },
- {
- kind = INSERT
- fields = [300, "C"]
- },
- {
- kind = INSERT
- fields = [300, "C"]
- },
- {
- kind = INSERT
- fields = [300, "C"]
- }
- {
- kind = UPDATE_BEFORE
- fields = [100, "A"]
- },
- {
- kind = UPDATE_AFTER
- fields = [100, "A_100"]
- },
- {
- kind = DELETE
- fields = [200, "B"]
- }
- ]
- }
+ },
+ {
+ schema = {
+ table = "FakeDatabase2.FakeTable1"
+ fields {
+ pk_id = bigint
+ name = string
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [100, "A"]
+ },
+ {
+ kind = INSERT
+ fields = [200, "B"]
+ },
+ {
+ kind = INSERT
+ fields = [300, "C"]
+ },
+ {
+ kind = INSERT
+ fields = [300, "C"]
+ },
+ {
+ kind = INSERT
+ fields = [300, "C"]
+ },
+ {
+ kind = INSERT
+ fields = [300, "C"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [100, "A"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [100, "A_100"]
+ },
+ {
+ kind = DELETE
+ fields = [200, "B"]
+ }
+ ]
+ }
]
}
}
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "${database_name}"
table = "${table_name}"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
index f5db1c8253..2c7ac5c06e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case3.conf
@@ -83,11 +83,11 @@ transform {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace3"
table = "st_test"
paimon.table.write-props = {
- bucket = 2
+ bucket = 2
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
index 9a287a61b1..42feabe071 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case4.conf
@@ -79,11 +79,11 @@ transform {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace4"
table = "st_test"
paimon.table.write-props = {
- bucket = 2
+ bucket = 2
}
paimon.table.partition-keys = "dt"
paimon.table.primary-keys = "pk_id,dt"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
index 65df2115f4..ce9bac3130 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case5.conf
@@ -83,11 +83,11 @@ transform {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace5"
table = "st_test"
paimon.table.write-props = {
- file.format = "parquet"
+ file.format = "parquet"
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
index 102747ef0f..9b3ce7c16a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case6.conf
@@ -83,11 +83,11 @@ transform {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace6"
table = "st_test"
paimon.table.write-props = {
- file.format = "avro"
+ file.format = "avro"
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
index 6578c72358..037f6cb30e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case7.conf
@@ -27,46 +27,46 @@ source {
FakeSource {
schema = {
columns = [
- {
- name = pk_id
- type = bigint
- nullable = false
- comment = "primary key id"
- },
- {
- name = name
- type = "string"
- nullable = true
- comment = "name"
- },
- {
- name = one_time
- type = timestamp
- nullable = false
- comment = "one time"
- columnScale = 0
- },
- {
- name = two_time
- type = timestamp
- nullable = false
- comment = "two time"
- columnScale = 3
- },
- {
- name = three_time
- type = timestamp
- nullable = false
- comment = "three time"
- columnScale = 6
- },
- {
- name = four_time
- type = timestamp
- nullable = false
- comment = "four time"
- columnScale = 9
- }
+ {
+ name = pk_id
+ type = bigint
+ nullable = false
+ comment = "primary key id"
+ },
+ {
+ name = name
+ type = "string"
+ nullable = true
+ comment = "name"
+ },
+ {
+ name = one_time
+ type = timestamp
+ nullable = false
+ comment = "one time"
+ columnScale = 0
+ },
+ {
+ name = two_time
+ type = timestamp
+ nullable = false
+ comment = "two time"
+ columnScale = 3
+ },
+ {
+ name = three_time
+ type = timestamp
+ nullable = false
+ comment = "three time"
+ columnScale = 6
+ },
+ {
+ name = four_time
+ type = timestamp
+ nullable = false
+ comment = "four time"
+ columnScale = 9
+ }
]
primaryKey {
name = "pk_id"
@@ -120,7 +120,7 @@ transform {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace7"
table = "st_test"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case8.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case8.conf
index 2fc4910e98..414d048bf2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case8.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case8.conf
@@ -27,24 +27,24 @@ source {
FakeSource {
schema = {
columns = [
- {
- name = pk_id
- type = bigint
- nullable = false
- comment = "primary key id"
- },
- {
- name = name
- type = "string"
- nullable = true
- comment = "name"
- },
- {
- name = one_date
- type = date
- nullable = false
- comment = "one date"
- }
+ {
+ name = pk_id
+ type = bigint
+ nullable = false
+ comment = "primary key id"
+ },
+ {
+ name = name
+ type = "string"
+ nullable = true
+ comment = "name"
+ },
+ {
+ name = one_date
+ type = date
+ nullable = false
+ comment = "one date"
+ }
]
primaryKey {
name = "pk_id"
@@ -86,7 +86,7 @@ transform {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace8"
table = "st_test"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
index 674491f90d..090948f656 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_case9.conf
@@ -67,7 +67,7 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace9"
table = "st_test"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_ha.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_ha.conf
index 7e5fd6da94..ec8c4ce529 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_ha.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_ha.conf
@@ -80,10 +80,10 @@ source {
sink {
Paimon {
schema_save_mode = "RECREATE_SCHEMA"
- catalog_name="seatunnel_test"
- warehouse="hdfs:///tmp/paimon"
- database="seatunnel_namespace1"
- table="st_test"
+ catalog_name = "seatunnel_test"
+ warehouse = "hdfs:///tmp/paimon"
+ database = "seatunnel_namespace1"
+ table = "st_test"
paimon.hadoop.conf = {
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf
index 3afdc59701..eb517229a5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf
@@ -80,12 +80,12 @@ source {
sink {
Paimon {
schema_save_mode = "RECREATE_SCHEMA"
- catalog_name="seatunnel_test"
- catalog_type="hive"
- catalog_uri="thrift://hadoop04:9083"
- warehouse="hdfs:///tmp/seatunnel"
- database="seatunnel_test"
- table="st_test3"
+ catalog_name = "seatunnel_test"
+ catalog_type = "hive"
+ catalog_uri = "thrift://hadoop04:9083"
+ warehouse = "hdfs:///tmp/seatunnel"
+ database = "seatunnel_test"
+ table = "st_test3"
paimon.hadoop.conf = {
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf
index f9993fe33f..ac40022241 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_to_dynamic_bucket_paimon_case.conf
@@ -27,46 +27,46 @@ source {
FakeSource {
schema = {
columns = [
- {
- name = pk_id
- type = bigint
- nullable = false
- comment = "primary key id"
- },
- {
- name = name
- type = "string"
- nullable = true
- comment = "name"
- },
- {
- name = one_time
- type = timestamp
- nullable = false
- comment = "one time"
- columnScale = 0
- },
- {
- name = two_time
- type = timestamp
- nullable = false
- comment = "two time"
- columnScale = 3
- },
- {
- name = three_time
- type = timestamp
- nullable = false
- comment = "three time"
- columnScale = 6
- },
- {
- name = four_time
- type = timestamp
- nullable = false
- comment = "four time"
- columnScale = 9
- }
+ {
+ name = pk_id
+ type = bigint
+ nullable = false
+ comment = "primary key id"
+ },
+ {
+ name = name
+ type = "string"
+ nullable = true
+ comment = "name"
+ },
+ {
+ name = one_time
+ type = timestamp
+ nullable = false
+ comment = "one time"
+ columnScale = 0
+ },
+ {
+ name = two_time
+ type = timestamp
+ nullable = false
+ comment = "two time"
+ columnScale = 3
+ },
+ {
+ name = three_time
+ type = timestamp
+ nullable = false
+ comment = "three time"
+ columnScale = 6
+ },
+ {
+ name = four_time
+ type = timestamp
+ nullable = false
+ comment = "four time"
+ columnScale = 9
+ }
]
primaryKey {
name = "pk_id"
@@ -120,12 +120,12 @@ transform {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "default"
- table = "st_test_3"
+ table = "st_test_cdc_write"
paimon.table.write-props = {
- bucket = -1
- dynamic-bucket.target-row-num = 50000
+ bucket = -1
+ dynamic-bucket.target-row-num = 50000
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
index 7f9c453d35..55fec624d9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hdfs_case2.conf
@@ -51,7 +51,7 @@ sink {
warehouse = "hdfs:///tmp/paimon"
database = "seatunnel_namespace11"
table = "st_test"
- data_save_mode=DROP_DATA
+ data_save_mode = DROP_DATA
paimon.hadoop.conf = {
hadoop_user_name = "hdfs"
fs.defaultFS = "hdfs://nameservice1"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
index 26e95870e3..d99d927c7e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case1.conf
@@ -65,8 +65,8 @@ source {
sink {
Paimon {
warehouse = "hdfs:///tmp/paimon"
- catalog_type="hive"
- catalog_uri="thrift://hadoop04:9083"
+ catalog_type = "hive"
+ catalog_uri = "thrift://hadoop04:9083"
database = "seatunnel_namespace12"
table = "st_test"
paimon.hadoop.conf = {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
index ef1e79b86e..37f8afc326 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_hive_case2.conf
@@ -49,11 +49,11 @@ source {
sink {
Paimon {
warehouse = "hdfs:///tmp/paimon"
- catalog_type="hive"
- catalog_uri="thrift://hadoop04:9083"
+ catalog_type = "hive"
+ catalog_uri = "thrift://hadoop04:9083"
database = "seatunnel_namespace12"
table = "st_test"
- data_save_mode=DROP_DATA
+ data_save_mode = DROP_DATA
paimon.hadoop.conf = {
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
index e22474a06d..2e8356879a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case1.conf
@@ -64,7 +64,7 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace10"
table = "st_test"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
index 64cb24bc8e..8b322f0a02 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_sink_paimon_truncate_with_local_case2.conf
@@ -48,9 +48,9 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace10"
table = "st_test"
- data_save_mode=DROP_DATA
+ data_save_mode = DROP_DATA
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
index de4e21e3ed..92120c4bc9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
@@ -23,6 +23,8 @@ env {
source {
FakeSource {
row.num = 100000
+ auto.increment.enabled = true
+ auto.increment.start = 1
schema = {
fields {
pk_id = int
@@ -42,6 +44,10 @@ source {
c_timestamp = timestamp
c_time = time
}
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
}
plugin_output = "fake"
}
@@ -49,12 +55,12 @@ source {
sink {
Paimon {
- warehouse = "file:///opt/seatunnel_mounts/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "default"
table = "st_test"
paimon.table.write-props = {
- bucket = -1
- dynamic-bucket.target-row-num = 50000
+ bucket = -1
+ dynamic-bucket.target-row-num = 50000
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
index 338e624d04..993543effd 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case2.conf
@@ -25,6 +25,8 @@ env {
source {
FakeSource {
+ auto.increment.enabled = true
+ auto.increment.start = 1
row.num = 100000
schema = {
fields {
@@ -42,12 +44,12 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "default"
table = "st_test_2"
paimon.table.write-props = {
- bucket = -1
- dynamic-bucket.target-row-num = 50000
+ bucket = -1
+ dynamic-bucket.target-row-num = 50000
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
index 21cbc0ef83..48ed12b65f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case3.conf
@@ -25,6 +25,8 @@ env {
source {
FakeSource {
+ auto.increment.enabled = true
+ auto.increment.start = 1
row.num = 100000
schema = {
fields {
@@ -42,12 +44,12 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "default"
table = "st_test_3"
paimon.table.write-props = {
- bucket = -1
- dynamic-bucket.target-row-num = 50000
+ bucket = -1
+ dynamic-bucket.target-row-num = 50000
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
index 9a71171330..0270b7de36 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case4.conf
@@ -71,12 +71,12 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
- database = "default"
- table = "st_test_4"
- paimon.table.write-props = {
- bucket = -1
- dynamic-bucket.target-row-num = 5
- }
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
+ database = "default"
+ table = "st_test_4"
+ paimon.table.write-props = {
+ bucket = -1
+ dynamic-bucket.target-row-num = 5
}
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
index b1e562c088..92df043c7d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case5.conf
@@ -25,6 +25,8 @@ env {
source {
FakeSource {
+ auto.increment.enabled = true
+ auto.increment.start = 1000000
row.num = 100000
schema = {
fields {
@@ -43,10 +45,10 @@ source {
sink {
Paimon {
schema_save_mode = "RECREATE_SCHEMA"
- catalog_name="seatunnel_test"
- warehouse="hdfs:///tmp/paimon"
- database="default"
- table="st_test_5"
+ catalog_name = "seatunnel_test"
+ warehouse = "hdfs:///tmp/paimon"
+ database = "default"
+ table = "st_test_5"
paimon.table.write-props = {
bucket = -1
dynamic-bucket.target-row-num = 50000
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
index 43fb21dbf8..8c8532b8d8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
@@ -42,7 +42,7 @@ source {
}
primaryKey {
name = "pk"
- columnNames =
[c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_decimal,c_bytes,c_date,c_timestamp]
+ columnNames = [c_string, c_boolean, c_tinyint, c_smallint, c_int,
c_bigint, c_float, c_double, c_decimal, c_bytes, c_date, c_timestamp]
}
}
rows = [
@@ -60,7 +60,7 @@ source {
}
{
kind = INSERT
- fields = [{"a": "f"}, [104], null, false, 118, 15988, 563873951,
7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214",
"bWlJWmo=", "2023-04-24", "2023-04-24T23:20:58", "23:20:58"]
+ fields = [{"a": "f"}, [104], "", false, 118, 15988, 563873951,
7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214",
"bWlJWmo=", "2023-04-24", "2023-04-24T23:20:58", "23:20:58"]
}
{
kind = INSERT
@@ -85,11 +85,11 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
paimon.table.write-props = {
- bucket = -1
+ bucket = -1
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
index f87cd54213..d5960a79df 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
@@ -42,7 +42,7 @@ source {
}
primaryKey {
name = "pk"
- columnNames =
[c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_decimal,c_bytes,c_date,c_timestamp,c_time]
+ columnNames = [c_string, c_boolean, c_tinyint, c_smallint, c_int,
c_bigint, c_float, c_double, c_decimal, c_bytes, c_date, c_timestamp, c_time]
}
}
rows = [
@@ -73,7 +73,7 @@ source {
sink {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
paimon.table.write-props = {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
index e93a7a8653..6f794a0c68 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
@@ -29,9 +29,12 @@ env {
source {
FakeSource {
+ auto.increment.enabled = true
+ auto.increment.start = 1
row.num = 100000
schema = {
fields {
+ pk_id = bigint
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
@@ -48,6 +51,10 @@ source {
c_timestamp = timestamp
c_time = time
}
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
}
plugin_output = "fake"
}
@@ -55,7 +62,7 @@ source {
sink {
Paimon {
- warehouse = "/opt/seatunnel_mounts/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "default"
table = "st_test"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf
index e076b521df..d16b54faec 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf
@@ -85,7 +85,8 @@ source {
sink {
Paimon {
- warehouse = "/tmp/paimon"
+ plugin_input = "fake"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
index 086becf2b8..8efdb8f2cc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf
@@ -73,7 +73,7 @@ source {
sink {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
index a379a638eb..86137b9e78 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3.conf
@@ -85,11 +85,11 @@ sink {
database = "seatunnel_namespace11"
table = "st_test"
paimon.hadoop.conf = {
- fs.s3a.access-key=minio
- fs.s3a.secret-key=miniominio
- fs.s3a.endpoint="http://minio:9000"
- fs.s3a.path.style.access=true
-
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+ fs.s3a.access-key = minio
+ fs.s3a.secret-key = miniominio
+ fs.s3a.endpoint = "http://minio:9000"
+ fs.s3a.path.style.access = true
+ fs.s3a.aws.credentials.provider =
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
index dc2585abc9..55d94aa033 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_s3_with_checkpoint.conf
@@ -53,11 +53,11 @@ sink {
database = "seatunnel_namespace12"
table = "st_test"
paimon.hadoop.conf = {
- fs.s3a.access-key=minio
- fs.s3a.secret-key=miniominio
- fs.s3a.endpoint="http://minio:9000"
- fs.s3a.path.style.access=true
-
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+ fs.s3a.access-key = minio
+ fs.s3a.secret-key = miniominio
+ fs.s3a.endpoint = "http://minio:9000"
+ fs.s3a.path.style.access = true
+ fs.s3a.aws.credentials.provider =
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
index a214430dd0..62777ed3d6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/mysql_cdc_to_paimon_with_schema_change.conf
@@ -23,8 +23,8 @@ env {
parallelism = 5
job.mode = "STREAMING"
checkpoint.interval = 5000
- read_limit.bytes_per_second=7000000
- read_limit.rows_per_second=400
+ read_limit.bytes_per_second = 7000000
+ read_limit.rows_per_second = 400
}
source {
@@ -41,7 +41,7 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///tmp/seatunnel_mnt/paimon"
database = "mysql_to_paimon"
table = "products"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
index a45792848e..bf2781b536 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
@@ -29,7 +29,7 @@ env {
source {
Paimon {
- warehouse = "/opt/seatunnel_mounts/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "default"
table = "st_test"
plugin_output = paimon_source
@@ -41,15 +41,15 @@ sink {
Assert {
plugin_input = paimon_source
rules {
- row_rules = [
+ row_rules = [
{
rule_type = MIN_ROW
rule_value = 100000
},
{
- rule_type = MAX_ROW
- rule_value = 100000
- }
+ rule_type = MAX_ROW
+ rule_value = 100000
+ }
],
field_rules = [
{
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
index d8d5c9f0d2..9224fef95d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
@@ -29,7 +29,7 @@ env {
source {
Paimon {
- warehouse = "/opt/seatunnel_mounts/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "default"
table = "st_test"
plugin_output = paimon_source
@@ -40,15 +40,15 @@ sink {
Assert {
plugin_input = paimon_source
rules {
- row_rules = [
+ row_rules = [
{
rule_type = MIN_ROW
rule_value = 100000
},
{
- rule_type = MAX_ROW
- rule_value = 100000
- }
+ rule_type = MAX_ROW
+ rule_value = 100000
+ }
],
field_rules = [
{
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf
index 6c54339442..96e145a89e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf
@@ -22,7 +22,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
query = "select * from st_test where c_string is not null"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf
index c5faa260aa..4dc87f9394 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf
@@ -22,7 +22,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
query = "select * from st_test where c_string='c_string2'"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf
index 27d8c1897e..21baafc1e5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf
@@ -22,7 +22,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
query = "select * from st_test where c_boolean= 'true' and c_tinyint > 116
and c_smallint = 15987"
@@ -56,24 +56,24 @@ sink {
]
}
{
- field_name = c_tinyint
- field_type = tinyint
- field_value = [
- {
- rule_type = MIN
- rule_value = 116
- }
- ]
+ field_name = c_tinyint
+ field_type = tinyint
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 116
+ }
+ ]
}
{
- field_name = c_smallint
- field_type = smallint
- field_value = [
- {
- rule_type = NOT_NULL
- equals_to = 15987
- }
- ]
+ field_name = c_smallint
+ field_type = smallint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 15987
+ }
+ ]
}
]
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf
index 8bcec7150a..aa8c0f6e55 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf
@@ -22,7 +22,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
query = "select * from st_test where c_date > '2023-04-21' and
c_timestamp='2023-04-27 23:20:58'"
@@ -46,24 +46,24 @@ sink {
]
field_rules = [
{
- field_name = c_date
- field_type = date
- field_value = [
- {
- rule_type = NOT_NULL
- equals_to = "2023-04-27"
- }
- ]
+ field_name = c_date
+ field_type = date
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "2023-04-27"
+ }
+ ]
}
{
- field_name = c_timestamp
- field_type = timestamp
- field_value = [
- {
- rule_type = NOT_NULL
- equals_to = "2023-04-27T23:20:58"
- }
- ]
+ field_name = c_timestamp
+ field_type = timestamp
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "2023-04-27T23:20:58"
+ }
+ ]
}
]
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter5.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter5.conf
index d1d43ecfd2..01eb77fbb7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter5.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter5.conf
@@ -22,7 +22,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
query = "select * from st_test where c_boolean= 'true' and c_smallint =
15987 and c_tinyint between 116 and 120"
@@ -56,29 +56,29 @@ sink {
]
}
{
- field_name = c_tinyint
- field_type = tinyint
- field_value = [
- {
- rule_type = MIN
- rule_value = 116
- },
- {
- rule_type = MAX
- rule_value = 120
- }
+ field_name = c_tinyint
+ field_type = tinyint
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 116
+ },
+ {
+ rule_type = MAX
+ rule_value = 120
+ }
- ]
+ ]
}
{
- field_name = c_smallint
- field_type = smallint
- field_value = [
- {
- rule_type = NOT_NULL
- equals_to = 15987
- }
- ]
+ field_name = c_smallint
+ field_type = smallint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 15987
+ }
+ ]
}
]
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter6.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter6.conf
index 785f294cf6..5dd29e0f50 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter6.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter6.conf
@@ -22,7 +22,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
query = "select * from st_test where c_boolean= 'true' and c_smallint =
15987 and c_tinyint in (117, 118, 119)"
@@ -56,29 +56,29 @@ sink {
]
}
{
- field_name = c_tinyint
- field_type = tinyint
- field_value = [
- {
- rule_type = MIN
- rule_value = 117
- },
- {
- rule_type = MAX
- rule_value = 119
- }
+ field_name = c_tinyint
+ field_type = tinyint
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 117
+ },
+ {
+ rule_type = MAX
+ rule_value = 119
+ }
- ]
+ ]
}
{
- field_name = c_smallint
- field_type = smallint
- field_value = [
- {
- rule_type = NOT_NULL
- equals_to = 15987
- }
- ]
+ field_name = c_smallint
+ field_type = smallint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 15987
+ }
+ ]
}
]
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter7.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter7.conf
index daa5a1fb1e..698bf73cec 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter7.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter7.conf
@@ -22,7 +22,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
query = "select * from st_test where c_boolean= 'true' and c_smallint =
15987 and c_tinyint not in (116, 120)"
@@ -56,29 +56,29 @@ sink {
]
}
{
- field_name = c_tinyint
- field_type = tinyint
- field_value = [
- {
- rule_type = MIN
- rule_value = 117
- },
- {
- rule_type = MAX
- rule_value = 119
- }
+ field_name = c_tinyint
+ field_type = tinyint
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 117
+ },
+ {
+ rule_type = MAX
+ rule_value = 119
+ }
- ]
+ ]
}
{
- field_name = c_smallint
- field_type = smallint
- field_value = [
- {
- rule_type = NOT_NULL
- equals_to = 15987
- }
- ]
+ field_name = c_smallint
+ field_type = smallint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 15987
+ }
+ ]
}
]
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter8.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter8.conf
index 1e0947403c..f23f8d15df 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter8.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter8.conf
@@ -22,7 +22,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
query = "select * from st_test where c_string like 'c_string2%'"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf
index c9f9136bd3..1de9aa4efb 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf
@@ -22,12 +22,12 @@ env {
source {
Paimon {
- catalog_name="seatunnel_test"
- catalog_type="hive"
- catalog_uri="thrift://hadoop04:9083"
- warehouse="hdfs:///tmp/seatunnel"
- database="seatunnel_test"
- table="st_test3"
+ catalog_name = "seatunnel_test"
+ catalog_type = "hive"
+ catalog_uri = "thrift://hadoop04:9083"
+ warehouse = "hdfs:///tmp/seatunnel"
+ database = "seatunnel_test"
+ table = "st_test3"
paimon.hadoop.conf = {
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf
index 68101da6f3..5c3c340058 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf
@@ -22,7 +22,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "seatunnel_namespace7"
table = "st_test"
plugin_output = paimon_source
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
index 50728871af..5daa2b8a68 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf
@@ -22,7 +22,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test"
}
@@ -30,7 +30,7 @@ source {
sink {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/tmp/seatunnel_mnt/paimon"
database = "full_type"
table = "st_test_sink"
paimon.table.primary-keys = "c_tinyint"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
index 6684b5fa95..489ab781fd 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_with_s3_to_assert.conf
@@ -31,68 +31,68 @@ source {
database = "seatunnel_namespace11"
table = "st_test"
paimon.hadoop.conf = {
- fs.s3a.access-key=minio
- fs.s3a.secret-key=miniominio
- fs.s3a.endpoint="http://minio:9000"
- fs.s3a.path.style.access=true
-
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
+ fs.s3a.access-key = minio
+ fs.s3a.secret-key = miniominio
+ fs.s3a.endpoint = "http://minio:9000"
+ fs.s3a.path.style.access = true
+ fs.s3a.aws.credentials.provider =
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}
sink {
- Assert {
+ Assert {
rules {
- row_rules = [
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 2
+ }
+ ],
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
{
- rule_type = MIN_ROW
- rule_value = 2
- }
- ],
- row_rules = [
+ rule_type = NOT_NULL
+ },
{
- rule_type = MAX_ROW
- rule_value = 2
+ rule_type = MIN
+ rule_value = 1
+ },
+ {
+ rule_type = MAX
+ rule_value = 3
}
- ],
- field_rules = [
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
{
- field_name = pk_id
- field_type = bigint
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 1
- },
- {
- rule_type = MAX
- rule_value = 3
- }
- ]
- },
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
{
- field_name = name
- field_type = string
- field_value = [
- {
- rule_type = NOT_NULL
- }
- ]
- },
- {
- field_name = score
- field_type = int
- field_value = [
- {
- rule_type = NOT_NULL
- equals_to = 100
- }
- ]
- }
+ rule_type = NOT_NULL
+ equals_to = 100
+ }
]
}
+ ]
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf
index eff421346f..ae896b18f0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf
@@ -22,10 +22,10 @@ env {
source {
Paimon {
- catalog_name="seatunnel_test"
- warehouse="hdfs:///tmp/paimon"
- database="seatunnel_namespace1"
- table="st_test"
+ catalog_name = "seatunnel_test"
+ warehouse = "hdfs:///tmp/paimon"
+ database = "seatunnel_namespace1"
+ table = "st_test"
query = "select * from st_test where pk_id is not null and pk_id < 3"
paimon.hadoop.conf = {
fs.defaultFS = "hdfs://nameservice1"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
index 9b0dfa8028..1a9a41d79c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/schema-0.json
@@ -1,75 +1,99 @@
{
- "id" : 0,
- "fields" : [ {
- "id" : 0,
- "name" : "c_map",
- "type" : {
- "type" : "MAP",
- "key" : "STRING",
- "value" : "STRING"
+ "id": 0,
+ "fields": [
+ {
+ "id": 0,
+ "name": "pk_id",
+ "type": "BIGINT NOT NULL"
+ },
+ {
+ "id": 1,
+ "name": "c_map",
+ "type": {
+ "type": "MAP",
+ "key": "STRING",
+ "value": "STRING"
+ }
+ },
+ {
+ "id": 2,
+ "name": "c_array",
+ "type": {
+ "type": "ARRAY",
+ "element": "INT"
+ }
+ },
+ {
+ "id": 3,
+ "name": "c_string",
+ "type": "STRING"
+ },
+ {
+ "id": 4,
+ "name": "c_boolean",
+ "type": "BOOLEAN"
+ },
+ {
+ "id": 5,
+ "name": "c_tinyint",
+ "type": "TINYINT"
+ },
+ {
+ "id": 6,
+ "name": "c_smallint",
+ "type": "SMALLINT"
+ },
+ {
+ "id": 7,
+ "name": "c_int",
+ "type": "INT"
+ },
+ {
+ "id": 8,
+ "name": "c_bigint",
+ "type": "BIGINT"
+ },
+ {
+ "id": 9,
+ "name": "c_float",
+ "type": "FLOAT"
+ },
+ {
+ "id": 10,
+ "name": "c_double",
+ "type": "DOUBLE"
+ },
+ {
+ "id": 11,
+ "name": "c_decimal",
+ "type": "DECIMAL(30, 8)"
+ },
+ {
+ "id": 12,
+ "name": "c_bytes",
+ "type": "BYTES"
+ },
+ {
+ "id": 13,
+ "name": "c_date",
+ "type": "DATE"
+ },
+ {
+ "id": 14,
+ "name": "c_timestamp",
+ "type": "TIMESTAMP(6)"
+ },
+ {
+ "id": 15,
+ "name": "c_time",
+ "type": "TIME(0)"
}
- }, {
- "id" : 1,
- "name" : "c_array",
- "type" : {
- "type" : "ARRAY",
- "element" : "INT"
- }
- }, {
- "id" : 2,
- "name" : "c_string",
- "type" : "STRING"
- }, {
- "id" : 3,
- "name" : "c_boolean",
- "type" : "BOOLEAN"
- }, {
- "id" : 4,
- "name" : "c_tinyint",
- "type" : "TINYINT"
- }, {
- "id" : 5,
- "name" : "c_smallint",
- "type" : "SMALLINT"
- }, {
- "id" : 6,
- "name" : "c_int",
- "type" : "INT"
- }, {
- "id" : 7,
- "name" : "c_bigint",
- "type" : "BIGINT"
- }, {
- "id" : 8,
- "name" : "c_float",
- "type" : "FLOAT"
- }, {
- "id" : 9,
- "name" : "c_double",
- "type" : "DOUBLE"
- }, {
- "id" : 10,
- "name" : "c_decimal",
- "type" : "DECIMAL(30, 8)"
- }, {
- "id" : 11,
- "name" : "c_bytes",
- "type" : "BYTES"
- }, {
- "id" : 12,
- "name" : "c_date",
- "type" : "DATE"
- }, {
- "id" : 13,
- "name" : "c_timestamp",
- "type" : "TIMESTAMP(6)"
- }, {
- "id" : 14,
- "name" : "c_time",
- "type" : "TIME"
- }],
- "highestFieldId" : 14,
- "partitionKeys" : [ ],
- "primaryKeys" : [ ],
- "options" : { }
-}
+ ],
+ "highestFieldId": 15,
+ "partitionKeys": [],
+ "primaryKeys": [
+ "pk_id"
+ ],
+ "options": {},
+ "timeMillis": 1751613422623
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
index bcf2043d70..9e02db6a7b 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
@@ -43,6 +43,21 @@ public abstract class AbstractTestContainer implements
TestContainer {
protected static final String START_ROOT_MODULE_NAME = "seatunnel-core";
public static final String SEATUNNEL_HOME = "/tmp/seatunnel/";
+
+ protected static final boolean isWindows =
+
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
+
+ protected static String hostName = System.getProperty("user.name");
+ protected Integer hostUid = Integer.parseInt(System.getProperty("user.id",
"1000"));
+ protected Integer hostGid =
Integer.parseInt(System.getProperty("user.gid", "1000"));
+
+ protected static final String CONTAINER_VOLUME_MOUNT_PATH =
"/tmp/seatunnel_mnt";
+
+ public static final String HOST_VOLUME_MOUNT_PATH =
+ isWindows
+ ? String.format("C:/Users/%s/tmp/seatunnel_mnt", hostName)
+ : CONTAINER_VOLUME_MOUNT_PATH;
+
protected final String startModuleName;
protected final String startModuleFullPath;
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 1bfe8e8c3c..7d2dec4360 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.e2e.common.container.flink;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
@@ -61,8 +62,6 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
protected static final String DEFAULT_DOCKER_IMAGE =
"flink:1.13.6-scala_2.11";
- protected static final String MOUNTS_PATH = "/opt/seatunnel_mounts";
-
protected GenericContainer<?> jobManager;
protected GenericContainer<?> taskManager;
@@ -73,6 +72,7 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
@Override
public void startUp() throws Exception {
+ FileUtils.createNewDir(HOST_VOLUME_MOUNT_PATH);
final String dockerImage = getDockerImage();
final String properties = String.join("\n", getFlinkProperties());
jobManager =
@@ -89,10 +89,12 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
new LogMessageWaitStrategy()
.withRegEx(".*Starting the resource
manager.*")
.withStartupTimeout(Duration.ofMinutes(2)))
- .withFileSystemBind(MOUNTS_PATH, MOUNTS_PATH,
BindMode.READ_WRITE);
+ .withFileSystemBind(
+ HOST_VOLUME_MOUNT_PATH,
+ CONTAINER_VOLUME_MOUNT_PATH,
+ BindMode.READ_WRITE);
copySeaTunnelStarterToContainer(jobManager);
copySeaTunnelStarterLoggingToContainer(jobManager);
-
jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s",
8081, 8081)));
taskManager =
@@ -111,11 +113,13 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
.withRegEx(
".*Successful registration at
resource manager.*")
.withStartupTimeout(Duration.ofMinutes(2)))
- .withFileSystemBind(MOUNTS_PATH, MOUNTS_PATH,
BindMode.READ_WRITE);
+ .withFileSystemBind(
+ HOST_VOLUME_MOUNT_PATH,
+ CONTAINER_VOLUME_MOUNT_PATH,
+ BindMode.READ_WRITE);
Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
- // execute extra commands
executeExtraCommands(jobManager);
}
@@ -126,11 +130,16 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
@Override
public void tearDown() throws Exception {
if (taskManager != null) {
+ // delete the volume
+ taskManager.execInContainer("rm", "-rf",
CONTAINER_VOLUME_MOUNT_PATH);
taskManager.stop();
}
if (jobManager != null) {
+ // delete the volume
+ jobManager.execInContainer("rm", "-rf",
CONTAINER_VOLUME_MOUNT_PATH);
jobManager.stop();
}
+ FileUtils.deleteFile(HOST_VOLUME_MOUNT_PATH);
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 2e225b2745..6397773294 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -39,6 +39,7 @@ import org.apache.http.util.EntityUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
+import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
@@ -84,6 +85,7 @@ public class SeaTunnelContainer extends AbstractTestContainer
{
@Override
public void startUp() throws Exception {
+ FileUtils.createNewDir(HOST_VOLUME_MOUNT_PATH);
server = createSeaTunnelServer();
}
@@ -114,6 +116,10 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(
"seatunnel-engine:" +
JDK_DOCKER_IMAGE)))
+ .withFileSystemBind(
+ HOST_VOLUME_MOUNT_PATH,
+ CONTAINER_VOLUME_MOUNT_PATH,
+ BindMode.READ_WRITE)
.waitingFor(Wait.forLogMessage(".*received new worker
register:.*", 1));
copySeaTunnelStarterToContainer(server);
server.setPortBindings(Arrays.asList("5801:5801", "8080:8080"));
@@ -213,8 +219,11 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
@Override
public void tearDown() throws Exception {
if (server != null) {
+ // delete the volume
+ server.execInContainer("rm", "-rf", CONTAINER_VOLUME_MOUNT_PATH);
server.close();
}
+ FileUtils.deleteFile(HOST_VOLUME_MOUNT_PATH);
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index d6c08f1231..ac201f2ae0 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -17,10 +17,12 @@
package org.apache.seatunnel.e2e.common.container.spark;
+import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -52,6 +54,7 @@ public abstract class AbstractTestSparkContainer extends
AbstractTestContainer {
@Override
public void startUp() throws Exception {
+ FileUtils.createNewDir(HOST_VOLUME_MOUNT_PATH);
master =
new GenericContainer<>(getDockerImage())
.withNetwork(NETWORK)
@@ -62,6 +65,10 @@ public abstract class AbstractTestSparkContainer extends
AbstractTestContainer {
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(getDockerImage())))
.withCreateContainerCmdModifier(cmd ->
cmd.withUser("root"))
+ .withFileSystemBind(
+ HOST_VOLUME_MOUNT_PATH,
+ CONTAINER_VOLUME_MOUNT_PATH,
+ BindMode.READ_WRITE)
.waitingFor(
new LogMessageWaitStrategy()
.withRegEx(".*Master: Starting Spark
master at.*")
@@ -80,8 +87,11 @@ public abstract class AbstractTestSparkContainer extends
AbstractTestContainer {
@Override
public void tearDown() throws Exception {
if (master != null) {
+ // delete the volume
+ master.execInContainer("rm", "-rf", CONTAINER_VOLUME_MOUNT_PATH);
master.stop();
}
+ FileUtils.deleteFile(HOST_VOLUME_MOUNT_PATH);
}
@Override