This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d8d289b682 [Test][e2e] Jdbc test checking data consistency.  (#5734)
d8d289b682 is described below

commit d8d289b6822b88e8fe4f49931710114cfed6c982
Author: MoSence <[email protected]>
AuthorDate: Wed Nov 1 19:09:38 2023 +0800

    [Test][e2e] Jdbc test checking data consistency.  (#5734)
---
 .../connectors/seatunnel/jdbc/AbstractJdbcIT.java  | 100 +++++++++-
 .../connectors/seatunnel/jdbc/JdbcITErrorCode.java |   1 +
 .../connectors/seatunnel/jdbc/JdbcDb2IT.java       |   8 +-
 .../connectors/seatunnel/jdbc/JdbcMysqlIT.java     | 216 +++++++++++++++------
 .../connectors/seatunnel/jdbc/JdbcOracleIT.java    |   2 +-
 ...mysql_source_and_sink_parallel_upper_lower.conf |   4 +-
 .../seatunnel/jdbc/JdbcOceanBaseITBase.java        |   2 +-
 .../connectors/seatunnel/jdbc/JdbcPhoenixIT.java   |   2 +-
 .../seatunnel/jdbc/JdbcStarRocksdbIT.java          |   2 +-
 .../connectors/seatunnel/jdbc/JdbcKingbaseIT.java  |   4 +-
 .../connectors/seatunnel/jdbc/JdbcSqlServerIT.java |   2 +-
 .../connectors/seatunnel/jdbc/JdbcVerticaIT.java   |   2 +-
 .../connectors/seatunnel/jdbc/JdbcDmIT.java        |   2 +-
 .../connectors/seatunnel/jdbc/JdbcDmUpsetIT.java   |   2 +-
 .../connectors/seatunnel/jdbc/JdbcGBase8aIT.java   |   2 +-
 .../connectors/seatunnel/jdbc/JdbcGreenplumIT.java |   2 +-
 .../seatunnel/jdbc/JdbcOracleLowercaseTableIT.java |   2 +-
 .../seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java |   2 +-
 18 files changed, 274 insertions(+), 83 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 3db2e36829..f61381d24b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
+import org.apache.seatunnel.shade.com.google.common.io.ByteStreams;
+import org.apache.seatunnel.shade.com.google.common.io.CharStreams;
+
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -37,23 +40,31 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.images.PullPolicy;
 import org.testcontainers.lifecycle.Startables;
 
 import com.github.dockerjava.api.model.Image;
-import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
 import java.sql.Connection;
 import java.sql.Driver;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
@@ -63,9 +74,10 @@ import java.util.stream.Stream;
 
 import static org.awaitility.Awaitility.given;
 
-@Slf4j
 public abstract class AbstractJdbcIT extends TestSuiteBase implements 
TestResource {
 
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
     protected static final String HOST = "HOST";
 
     @TestContainerExtension
@@ -88,7 +100,7 @@ public abstract class AbstractJdbcIT extends TestSuiteBase 
implements TestResour
 
     abstract JdbcCase getJdbcCase();
 
-    abstract void compareResult() throws SQLException, IOException;
+    abstract void compareResult(String executeKey) throws SQLException, 
IOException;
 
     abstract String driverUrl();
 
@@ -319,12 +331,14 @@ public abstract class AbstractJdbcIT extends 
TestSuiteBase implements TestResour
             throws IOException, InterruptedException, SQLException {
         List<String> configFiles = jdbcCase.getConfigFile();
         for (String configFile : configFiles) {
-            Container.ExecResult execResult = container.executeJob(configFile);
-            Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+            try {
+                Container.ExecResult execResult = 
container.executeJob(configFile);
+                Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+                compareResult(String.format("%s in [%s]", configFile, 
container.identifier()));
+            } finally {
+                clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSinkTable());
+            }
         }
-
-        compareResult();
-        clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), 
jdbcCase.getSinkTable());
     }
 
     protected void initCatalog() {}
@@ -363,4 +377,74 @@ public abstract class AbstractJdbcIT extends TestSuiteBase 
implements TestResour
             
Assertions.assertFalse(catalog.databaseExists(targetTablePath.getDatabaseName()));
         }
     }
+
+    protected Object[] toArrayResult(ResultSet resultSet, String[] fieldNames)
+            throws SQLException, IOException {
+        List<Object> result = new ArrayList<>(0);
+        while (resultSet.next()) {
+            Object[] rowArray = new Object[fieldNames.length];
+            for (int colIndex = 0; colIndex < fieldNames.length; colIndex++) {
+                rowArray[colIndex] = 
checkData(resultSet.getObject(fieldNames[colIndex]));
+            }
+            result.add(rowArray);
+        }
+        return result.toArray();
+    }
+
+    private Object checkData(Object data) throws SQLException, IOException {
+        if (data == null) {
+            return null;
+        } else if (data instanceof byte[]) {
+            return data;
+        } else if (data instanceof Clob) {
+            try (Reader reader = ((Clob) data).getCharacterStream()) {
+                return CharStreams.toString(reader);
+            }
+        } else if (data instanceof Blob) {
+            try (InputStream inputStream = ((Blob) data).getBinaryStream()) {
+                return ByteStreams.toByteArray(inputStream);
+            }
+        } else if (data instanceof Array) {
+            Object[] jdbcArray = (Object[]) ((Array) data).getArray();
+            Object[] javaArray = new Object[jdbcArray.length];
+            for (int index = 0; index < jdbcArray.length; index++) {
+                javaArray[index] = checkData(jdbcArray[index]);
+            }
+            return javaArray;
+        } else {
+            return data;
+        }
+    }
+
+    protected void defaultCompare(String executeKey, String[] fieldNames, 
String sortKey) {
+        try (Statement statement = connection.createStatement()) {
+            ResultSet source =
+                    statement.executeQuery(
+                            String.format(
+                                    "SELECT * FROM %s ORDER BY %s",
+                                    buildTableInfoWithSchema(
+                                            this.jdbcCase.getSchema(),
+                                            this.jdbcCase.getSourceTable()),
+                                    quoteIdentifier(sortKey)));
+            Object[] sourceResult = toArrayResult(source, fieldNames);
+            ResultSet sink =
+                    statement.executeQuery(
+                            String.format(
+                                    "SELECT * FROM %s ORDER BY %s",
+                                    buildTableInfoWithSchema(
+                                            this.jdbcCase.getSchema(),
+                                            this.jdbcCase.getSinkTable()),
+                                    quoteIdentifier(sortKey)));
+            Object[] sinkResult = toArrayResult(sink, fieldNames);
+            log.warn(
+                    "{}: source data count {}, sink data count {}.",
+                    executeKey,
+                    sourceResult.length,
+                    sinkResult.length);
+            Assertions.assertArrayEquals(
+                    sourceResult, sinkResult, String.format("[%s] data 
compare", executeKey));
+        } catch (SQLException | IOException e) {
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.DATA_COMPARISON_FAILED, e);
+        }
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java
index 965fb7ba85..ab101677b0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java
@@ -25,6 +25,7 @@ public enum JdbcITErrorCode implements SeaTunnelErrorCode {
     CREATE_TABLE_FAILED("JDBC-IT-02", "Fail to create table."),
     INSERT_DATA_FAILED("JDBC-IT-03", "Fail to inert data."),
     DRIVER_NOT_FOUND("JDBC-IT-04", "Can not get the driver."),
+    DATA_COMPARISON_FAILED("JDBC-IT-05", "Source data is inconsistent with 
target data."),
     ;
 
     private final String code;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
index f258faa75f..a482b1790b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDb2IT.java
@@ -29,7 +29,6 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerLoggerFactory;
 
 import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
 
 import java.math.BigDecimal;
 import java.sql.Date;
@@ -41,7 +40,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-@Slf4j
 public class JdbcDb2IT extends AbstractJdbcIT {
 
     private static final String DB2_CONTAINER_HOST = "db2-e2e";
@@ -120,7 +118,7 @@ public class JdbcDb2IT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {}
+    void compareResult(String executeKey) {}
 
     @Override
     String driverUrl() {
@@ -205,7 +203,9 @@ public class JdbcDb2IT extends AbstractJdbcIT {
     @Override
     public void clearTable(String schema, String table) {
         try (Statement statement = connection.createStatement()) {
-            String truncate = String.format("delete from \"%s\".%s where 
1=1;", schema, table);
+            String truncate =
+                    String.format(
+                            "delete from %s where 1=1;", 
buildTableInfoWithSchema(schema, table));
             statement.execute(truncate);
             connection.commit();
         } catch (SQLException e) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index 75c2b9324f..e2d2dcb56e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -18,6 +18,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
@@ -53,7 +55,6 @@ import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.utility.DockerImageName;
 import org.testcontainers.utility.DockerLoggerFactory;
 
-import com.google.common.collect.Lists;
 import com.mysql.cj.jdbc.ConnectionImpl;
 
 import java.io.IOException;
@@ -179,7 +180,57 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {}
+    protected void compareResult(String executeKey) {
+        String[] fieldNames =
+                new String[] {
+                    "c_bit_1",
+                    "c_bit_8",
+                    "c_bit_16",
+                    "c_bit_32",
+                    "c_bit_64",
+                    "c_boolean",
+                    "c_tinyint",
+                    "c_tinyint_unsigned",
+                    "c_smallint",
+                    "c_smallint_unsigned",
+                    "c_mediumint",
+                    "c_mediumint_unsigned",
+                    "c_int",
+                    "c_integer",
+                    "c_year",
+                    "c_int_unsigned",
+                    "c_integer_unsigned",
+                    "c_bigint",
+                    "c_bigint_unsigned",
+                    "c_decimal",
+                    "c_decimal_unsigned",
+                    "c_float",
+                    "c_float_unsigned",
+                    "c_double",
+                    "c_double_unsigned",
+                    "c_char",
+                    "c_tinytext",
+                    "c_mediumtext",
+                    "c_text",
+                    "c_varchar",
+                    "c_json",
+                    "c_longtext",
+                    "c_date",
+                    "c_datetime",
+                    "c_time",
+                    "c_timestamp",
+                    "c_tinyblob",
+                    "c_mediumblob",
+                    "c_blob",
+                    "c_longblob",
+                    "c_varbinary",
+                    "c_binary",
+                    "c_bigint_30",
+                    "c_decimal_unsigned_30",
+                    "c_decimal_30",
+                };
+        defaultCompare(executeKey, fieldNames, "c_bigint_30");
+    }
 
     @Override
     String driverUrl() {
@@ -242,58 +293,115 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
         BigDecimal decimalValue = new 
BigDecimal("999999999999999999999999999899");
         for (int i = 0; i < 100; i++) {
             byte byteArr = Integer.valueOf(i).byteValue();
-            SeaTunnelRow row =
-                    new SeaTunnelRow(
-                            new Object[] {
-                                i % 2 == 0 ? (byte) 1 : (byte) 0,
-                                new byte[] {byteArr},
-                                new byte[] {byteArr, byteArr},
-                                new byte[] {byteArr, byteArr, byteArr, 
byteArr},
-                                new byte[] {
-                                    byteArr, byteArr, byteArr, byteArr, 
byteArr, byteArr, byteArr,
-                                    byteArr
-                                },
-                                i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE,
-                                i,
-                                i,
-                                i,
-                                i,
-                                i,
-                                i,
-                                i,
-                                i,
-                                i,
-                                Long.parseLong("1"),
-                                Long.parseLong("1"),
-                                Long.parseLong("1"),
-                                BigDecimal.valueOf(i, 0),
-                                BigDecimal.valueOf(i, 18),
-                                BigDecimal.valueOf(i, 18),
-                                Float.parseFloat("1.1"),
-                                Float.parseFloat("1.1"),
-                                Double.parseDouble("1.1"),
-                                Double.parseDouble("1.1"),
-                                "f",
-                                String.format("f1_%s", i),
-                                String.format("f1_%s", i),
-                                String.format("f1_%s", i),
-                                String.format("f1_%s", i),
-                                String.format("{\"aa\":\"bb_%s\"}", i),
-                                String.format("f1_%s", i),
-                                Date.valueOf(LocalDate.now()),
-                                Timestamp.valueOf(LocalDateTime.now()),
-                                Time.valueOf(LocalTime.now()),
-                                new Timestamp(System.currentTimeMillis()),
-                                "test".getBytes(),
-                                "test".getBytes(),
-                                "test".getBytes(),
-                                "test".getBytes(),
-                                "test".getBytes(),
-                                "f".getBytes(),
-                                bigintValue.add(BigDecimal.valueOf(i)),
-                                decimalValue.add(BigDecimal.valueOf(i)),
-                                decimalValue.add(BigDecimal.valueOf(i)),
-                            });
+            SeaTunnelRow row;
+            if (i == 99) {
+                row =
+                        new SeaTunnelRow(
+                                new Object[] {
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    // 
https://github.com/apache/seatunnel/issues/5559 this value
+                                    // cannot set null, this null
+                                    // value column's row will be lost in
+                                    // 
jdbc_mysql_source_and_sink_parallel.conf,jdbc_mysql_source_and_sink_parallel_upper_lower.conf.
+                                    bigintValue.add(BigDecimal.valueOf(i)),
+                                    decimalValue.add(BigDecimal.valueOf(i)),
+                                    null,
+                                });
+            } else {
+                row =
+                        new SeaTunnelRow(
+                                new Object[] {
+                                    i % 2 == 0 ? (byte) 1 : (byte) 0,
+                                    new byte[] {byteArr},
+                                    new byte[] {byteArr, byteArr},
+                                    new byte[] {byteArr, byteArr, byteArr, 
byteArr},
+                                    new byte[] {
+                                        byteArr, byteArr, byteArr, byteArr, 
byteArr, byteArr,
+                                        byteArr, byteArr
+                                    },
+                                    i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE,
+                                    i,
+                                    i,
+                                    i,
+                                    i,
+                                    i,
+                                    i,
+                                    i,
+                                    i,
+                                    i,
+                                    Long.parseLong("1"),
+                                    Long.parseLong("1"),
+                                    Long.parseLong("1"),
+                                    BigDecimal.valueOf(i, 0),
+                                    BigDecimal.valueOf(i, 18),
+                                    BigDecimal.valueOf(i, 18),
+                                    Float.parseFloat("1.1"),
+                                    Float.parseFloat("1.1"),
+                                    Double.parseDouble("1.1"),
+                                    Double.parseDouble("1.1"),
+                                    "f",
+                                    String.format("f1_%s", i),
+                                    String.format("f1_%s", i),
+                                    String.format("f1_%s", i),
+                                    String.format("f1_%s", i),
+                                    String.format("{\"aa\":\"bb_%s\"}", i),
+                                    String.format("f1_%s", i),
+                                    Date.valueOf(LocalDate.now()),
+                                    Timestamp.valueOf(LocalDateTime.now()),
+                                    Time.valueOf(LocalTime.now()),
+                                    new Timestamp(System.currentTimeMillis()),
+                                    "test".getBytes(),
+                                    "test".getBytes(),
+                                    "test".getBytes(),
+                                    "test".getBytes(),
+                                    "test".getBytes(),
+                                    "f".getBytes(),
+                                    bigintValue.add(BigDecimal.valueOf(i)),
+                                    decimalValue.add(BigDecimal.valueOf(i)),
+                                    decimalValue.add(BigDecimal.valueOf(i)),
+                                });
+            }
             rows.add(row);
         }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index 75bdffbd6c..e98da6a1ab 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -116,7 +116,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {}
+    void compareResult(String executeKey) {}
 
     @Override
     String driverUrl() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
index 1b092f1e91..213ba00899 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -29,8 +29,8 @@ source {
     query = "select * from source"
     partition_column = "c_bigint_30"
     result_table_name = "jdbc"
-    partition_lower_bound = 2844674407371055160
-    partition_upper_bound = 2844674407371055259
+    partition_lower_bound = 2844674407371055000
+    partition_upper_bound = 2844674407371055099
     partition_num = 5
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
index 50177ef1a8..6cdc38780a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
@@ -46,7 +46,7 @@ public abstract class JdbcOceanBaseITBase extends 
AbstractJdbcIT {
     abstract String getFullTableName(String tableName);
 
     @Override
-    void compareResult() {
+    void compareResult(String executeKey) {
         String sourceSql =
                 String.format("select * from %s order by 1", 
getFullTableName(OCEANBASE_SOURCE));
         String sinkSql =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPhoenixIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPhoenixIT.java
index 34283f20d4..73ffef9947 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPhoenixIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPhoenixIT.java
@@ -122,7 +122,7 @@ public class JdbcPhoenixIT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {}
+    void compareResult(String executeKey) {}
 
     @Override
     String driverUrl() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
index 93d9a289ab..e7fc94e642 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
@@ -108,7 +108,7 @@ public class JdbcStarRocksdbIT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {}
+    void compareResult(String executeKey) {}
 
     @Override
     String driverUrl() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java
index 17d53bb87d..9d662e619d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java
@@ -28,7 +28,6 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerLoggerFactory;
 
 import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -51,7 +50,6 @@ import java.util.stream.Collectors;
  * engine does not support the TIME type.Two environment variables need to be 
added to the spark
  * container: "LANG"="C.UTF-8", "JAVA_TOOL_OPTIONS"="-Dfile.encoding=UTF8"
  */
-@Slf4j
 @Disabled("Due to copyright reasons, you need to download the trial version km 
license yourself")
 public class JdbcKingbaseIT extends AbstractJdbcIT {
     private static final String KINGBASE_IMAGE = "huzhihui/kingbase:v8r6";
@@ -124,7 +122,7 @@ public class JdbcKingbaseIT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() throws SQLException, IOException {}
+    void compareResult(String executeKey) throws SQLException, IOException {}
 
     @Override
     String driverUrl() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
index 0a170ff4be..e56fa37573 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
@@ -105,7 +105,7 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() throws SQLException, IOException {}
+    void compareResult(String executeKey) throws SQLException, IOException {}
 
     @Override
     String driverUrl() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
index df87df5163..1ff6032737 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcVerticaIT.java
@@ -91,7 +91,7 @@ public class JdbcVerticaIT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {}
+    void compareResult(String executeKey) {}
 
     @Override
     String driverUrl() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
index 8b3ebe135e..f2b9097ffa 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
@@ -131,7 +131,7 @@ public class JdbcDmIT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {}
+    void compareResult(String executeKey) {}
 
     @Override
     String driverUrl() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmUpsetIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmUpsetIT.java
index 7b9d52c1d5..d501f6a965 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmUpsetIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmUpsetIT.java
@@ -156,7 +156,7 @@ public class JdbcDmUpsetIT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {}
+    void compareResult(String executeKey) {}
 
     @Override
     protected void createNeededTables() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
index b5a5e83e46..388cf67ae9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGBase8aIT.java
@@ -110,7 +110,7 @@ public class JdbcGBase8aIT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {}
+    void compareResult(String executeKey) {}
 
     @Override
     String driverUrl() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
index c1846c999c..9c98c29a7a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
@@ -88,7 +88,7 @@ public class JdbcGreenplumIT extends AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {}
+    void compareResult(String executeKey) {}
 
     @Override
     String driverUrl() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
index b90387ec6d..378b76367c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleLowercaseTableIT.java
@@ -116,7 +116,7 @@ public class JdbcOracleLowercaseTableIT extends 
AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {}
+    void compareResult(String executeKey) {}
 
     @Override
     String driverUrl() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
index f9a0343d0d..053b8c9332 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSaveModeHandlerIT.java
@@ -154,7 +154,7 @@ public class JdbcMysqlSaveModeHandlerIT extends 
AbstractJdbcIT {
     }
 
     @Override
-    void compareResult() {
+    void compareResult(String executeKey) {
         final TablePath tablePathSource = TablePath.of("seatunnel", "source");
         final CatalogTable tableSource = catalog.getTable(tablePathSource);
         final List<Column> columnsSource = 
tableSource.getTableSchema().getColumns();


Reply via email to