This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bb2c912404 [Fix][Doris] Fix the abnormality of deleting data in CDC 
scenario. (#7315)
bb2c912404 is described below

commit bb2c912404fac13e829044c60259f2ab27bff3a1
Author: Guangdong Liu <804167...@qq.com>
AuthorDate: Tue Aug 6 21:46:12 2024 +0800

    [Fix][Doris] Fix the abnormality of deleting data in CDC scenario. (#7315)
---
 .../doris/serialize/SeaTunnelRowSerializer.java    |  77 +++++-----
 .../doris/sink/writer/DorisStreamLoad.java         |  32 ++--
 .../connector-doris-e2e/pom.xml                    |  14 ++
 .../e2e/connector/doris/DorisCDCSinkIT.java        | 171 ++++++++++++++++++---
 .../src/test/resources/ddl/mysql_cdc.sql           |  38 +++++
 .../src/test/resources/docker/server-gtids/my.cnf  |  65 ++++++++
 .../src/test/resources/docker/setup.sql            |  28 ++++
 .../resources/write-cdc-changelog-to-doris.conf    |  18 +--
 8 files changed, 356 insertions(+), 87 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
index 0c5b9c0c42..0e67257a32 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.doris.serialize;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -29,6 +30,7 @@ import 
org.apache.seatunnel.format.json.JsonSerializationSchema;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -42,6 +44,7 @@ public class SeaTunnelRowSerializer implements 
DorisSerializer {
     private final SeaTunnelRowType seaTunnelRowType;
     private final String fieldDelimiter;
     private final boolean enableDelete;
+    private final SerializationSchema serialize;
 
     public SeaTunnelRowSerializer(
             String type,
@@ -49,32 +52,46 @@ public class SeaTunnelRowSerializer implements 
DorisSerializer {
             String fieldDelimiter,
             boolean enableDelete) {
         this.type = type;
-        this.seaTunnelRowType = seaTunnelRowType;
         this.fieldDelimiter = fieldDelimiter;
         this.enableDelete = enableDelete;
-    }
+        List<Object> fieldNames = new 
ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldNames()));
+        List<SeaTunnelDataType<?>> fieldTypes =
+                new 
ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldTypes()));
+
+        if (enableDelete) {
+            fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
+            fieldTypes.add(STRING_TYPE);
+        }
 
-    public byte[] buildJsonString(SeaTunnelRow row, SeaTunnelRowType 
seaTunnelRowType)
-            throws IOException {
+        this.seaTunnelRowType =
+                new SeaTunnelRowType(
+                        fieldNames.toArray(new String[0]),
+                        fieldTypes.toArray(new SeaTunnelDataType<?>[0]));
 
-        JsonSerializationSchema jsonSerializationSchema =
-                new JsonSerializationSchema(seaTunnelRowType, NULL_VALUE);
-        ObjectMapper mapper = jsonSerializationSchema.getMapper();
-        mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, 
true);
-        return jsonSerializationSchema.serialize(row);
+        if (JSON.equals(type)) {
+            JsonSerializationSchema jsonSerializationSchema =
+                    new JsonSerializationSchema(this.seaTunnelRowType, 
NULL_VALUE);
+            ObjectMapper mapper = jsonSerializationSchema.getMapper();
+            mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, 
true);
+            this.serialize = jsonSerializationSchema;
+        } else {
+            this.serialize =
+                    TextSerializationSchema.builder()
+                            .seaTunnelRowType(this.seaTunnelRowType)
+                            .delimiter(fieldDelimiter)
+                            .nullValue(NULL_VALUE)
+                            .build();
+        }
     }
 
-    public byte[] buildCSVString(SeaTunnelRow row, SeaTunnelRowType 
seaTunnelRowType)
-            throws IOException {
+    public byte[] buildJsonString(SeaTunnelRow row) {
+
+        return serialize.serialize(row);
+    }
 
-        TextSerializationSchema build =
-                TextSerializationSchema.builder()
-                        .seaTunnelRowType(seaTunnelRowType)
-                        .delimiter(fieldDelimiter)
-                        .nullValue(NULL_VALUE)
-                        .build();
+    public byte[] buildCSVString(SeaTunnelRow row) {
 
-        return build.serialize(row);
+        return serialize.serialize(row);
     }
 
     public String parseDeleteSign(RowKind rowKind) {
@@ -93,29 +110,17 @@ public class SeaTunnelRowSerializer implements 
DorisSerializer {
     @Override
     public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
 
-        List<String> fieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
-        List<SeaTunnelDataType<?>> fieldTypes = 
Arrays.asList(seaTunnelRowType.getFieldTypes());
-
         if (enableDelete) {
-            SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy();
-            seaTunnelRowEnableDelete.setField(
-                    seaTunnelRow.getFields().length, 
parseDeleteSign(seaTunnelRow.getRowKind()));
-            fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
-            fieldTypes.add(STRING_TYPE);
+
+            List<Object> newFields = new 
ArrayList<>(Arrays.asList(seaTunnelRow.getFields()));
+            newFields.add(parseDeleteSign(seaTunnelRow.getRowKind()));
+            seaTunnelRow = new SeaTunnelRow(newFields.toArray());
         }
 
         if (JSON.equals(type)) {
-            return buildJsonString(
-                    seaTunnelRow,
-                    new SeaTunnelRowType(
-                            fieldNames.toArray(new String[0]),
-                            fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
+            return buildJsonString(seaTunnelRow);
         } else if (CSV.equals(type)) {
-            return buildCSVString(
-                    seaTunnelRow,
-                    new SeaTunnelRowType(
-                            fieldNames.toArray(new String[0]),
-                            fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
+            return buildCSVString(seaTunnelRow);
         } else {
             throw new IllegalArgumentException("The type " + type + " is not 
supported!");
         }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index eadcf94cd5..40b75aedc6 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -17,7 +17,10 @@
 
 package org.apache.seatunnel.connectors.doris.sink.writer;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.doris.config.DorisConfig;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
@@ -31,9 +34,9 @@ import org.apache.http.entity.InputStreamEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.util.EntityUtils;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
@@ -64,23 +67,23 @@ public class DorisStreamLoad implements Serializable {
     private static final String ABORT_URL_PATTERN = 
"http://%s/api/%s/_stream_load_2pc";;
     private static final String JOB_EXIST_FINISHED = "FINISHED";
     private final String loadUrlStr;
-    private final String hostPort;
+    @Getter private final String hostPort;
     private final String abortUrlStr;
     private final String user;
     private final String passwd;
-    private final String db;
+    @Getter private final String db;
     private final String table;
     private final boolean enable2PC;
     private final boolean enableDelete;
     private final Properties streamLoadProp;
     private final RecordStream recordStream;
-    private Future<CloseableHttpResponse> pendingLoadFuture;
+    @Getter private Future<CloseableHttpResponse> pendingLoadFuture;
     private final CloseableHttpClient httpClient;
     private final ExecutorService executorService;
     private volatile boolean loadBatchFirstRecord;
     private volatile boolean loading = false;
     private String label;
-    private long recordCount = 0;
+    @Getter private long recordCount = 0;
 
     public DorisStreamLoad(
             String hostPort,
@@ -115,18 +118,6 @@ public class DorisStreamLoad implements Serializable {
         loadBatchFirstRecord = true;
     }
 
-    public String getDb() {
-        return db;
-    }
-
-    public String getHostPort() {
-        return hostPort;
-    }
-
-    public Future<CloseableHttpResponse> getPendingLoadFuture() {
-        return pendingLoadFuture;
-    }
-
     public void abortPreCommit(String labelSuffix, long chkID) throws 
Exception {
         long startChkID = chkID;
         log.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, 
chkID);
@@ -196,10 +187,6 @@ public class DorisStreamLoad implements Serializable {
         recordCount++;
     }
 
-    public long getRecordCount() {
-        return recordCount;
-    }
-
     public String getLoadFailedMsg() {
         if (!loading) {
             return null;
@@ -300,10 +287,9 @@ public class DorisStreamLoad implements Serializable {
                     "Fail to abort transaction " + txnID + " with url " + 
abortUrlStr);
         }
 
-        ObjectMapper mapper = new ObjectMapper();
         String loadResult = EntityUtils.toString(response.getEntity());
         Map<String, String> res =
-                mapper.readValue(loadResult, new TypeReference<HashMap<String, 
String>>() {});
+                JsonUtils.parseObject(loadResult, new 
TypeReference<HashMap<String, String>>() {});
         if (!LoadStatus.SUCCESS.equals(res.get("status"))) {
             if (ResponseUtil.isCommitted(res.get("msg"))) {
                 throw new DorisConnectorException(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml
index af85d92ace..7a3008adb3 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml
@@ -49,5 +49,19 @@
             <version>${mysql.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-cdc-mysql</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <!-- test dependencies on TestContainers -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
index 9afa91d4e8..33108b8b8e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
@@ -17,16 +17,27 @@
 
 package org.apache.seatunnel.e2e.connector.doris;
 
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -34,11 +45,18 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.awaitility.Awaitility.await;
+
 @Slf4j
-@Disabled("we need resolve the issue of network between containers")
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK},
+        disabledReason = "Currently SPARK do not support cdc")
 public class DorisCDCSinkIT extends AbstractDorisIT {
 
     private static final String DATABASE = "test";
@@ -60,34 +78,121 @@ public class DorisCDCSinkIT extends AbstractDorisIT {
                     + "\"replication_allocation\" = \"tag.location.default: 
1\""
                     + ")";
 
+    // mysql
+    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_USER_NAME = "mysqluser";
+    private static final String MYSQL_USER_PASSWORD = "mysqlpw";
+    private static final String MYSQL_DATABASE = "mysql_cdc";
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V8_0);
+    private static final String SOURCE_TABLE = "mysql_cdc_e2e_source_table";
+
+    @TestContainerExtension
+    protected final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Doris-CDC/lib 
&& cd /tmp/seatunnel/plugins/Doris-CDC/lib && wget "
+                                        + driverUrl());
+                Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
+            };
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(
+                    MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", 
MYSQL_DATABASE);
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        return new MySqlContainer(version)
+                .withConfigurationOverride("docker/server-gtids/my.cnf")
+                .withSetupSQL("docker/setup.sql")
+                .withNetwork(NETWORK)
+                .withNetworkAliases(MYSQL_HOST)
+                .withDatabaseName(MYSQL_DATABASE)
+                .withUsername(MYSQL_USER_NAME)
+                .withPassword(MYSQL_USER_PASSWORD)
+                .withLogConsumer(
+                        new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("mysql-docker-image")));
+    }
+
+    private String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";;
+    }
+
     @BeforeAll
     public void init() {
+        log.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        log.info("Mysql Containers are started");
+        inventoryDatabase.createAndInitialize();
+        log.info("Mysql ddl execution is complete");
         initializeJdbcTable();
     }
 
     @TestTemplate
     public void testDorisCDCSink(TestContainer container) throws Exception {
-        Container.ExecResult execResult =
-                container.executeJob("/write-cdc-changelog-to-doris.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
+
+        clearTable(DATABASE, SINK_TABLE);
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        
container.executeJob("/write-cdc-changelog-to-doris.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
 
         String sinkSql = String.format("select * from %s.%s", DATABASE, 
SINK_TABLE);
-        Set<List<Object>> actual = new HashSet<>();
-        try (Statement sinkStatement = jdbcConnection.createStatement()) {
-            ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
-            while (sinkResultSet.next()) {
-                List<Object> row =
-                        Arrays.asList(
-                                sinkResultSet.getLong("uuid"),
-                                sinkResultSet.getString("name"),
-                                sinkResultSet.getInt("score"));
-                actual.add(row);
-            }
-        }
+
         Set<List<Object>> expected =
-                Stream.<List<Object>>of(Arrays.asList(1L, "A_1", 100), 
Arrays.asList(3L, "C", 100))
+                Stream.<List<Object>>of(
+                                Arrays.asList(1L, "Alice", 95), 
Arrays.asList(2L, "Bob", 88))
                         .collect(Collectors.toSet());
-        Assertions.assertIterableEquals(expected, actual);
+
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Set<List<Object>> actual = new HashSet<>();
+                            try (Statement sinkStatement = 
jdbcConnection.createStatement()) {
+                                ResultSet sinkResultSet = 
sinkStatement.executeQuery(sinkSql);
+                                while (sinkResultSet.next()) {
+                                    List<Object> row =
+                                            Arrays.asList(
+                                                    
sinkResultSet.getLong("uuid"),
+                                                    
sinkResultSet.getString("name"),
+                                                    
sinkResultSet.getInt("score"));
+                                    actual.add(row);
+                                }
+                            }
+                            Assertions.assertIterableEquals(expected, actual);
+                        });
+
+        executeSql("DELETE FROM " + MYSQL_DATABASE + "." + SOURCE_TABLE + " 
WHERE uuid = 1");
+
+        Set<List<Object>> expectedAfterDelete =
+                Stream.<List<Object>>of(Arrays.asList(2L, "Bob", 
88)).collect(Collectors.toSet());
+
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Set<List<Object>> actual = new HashSet<>();
+                            try (Statement sinkStatement = 
jdbcConnection.createStatement()) {
+                                ResultSet sinkResultSet = 
sinkStatement.executeQuery(sinkSql);
+                                while (sinkResultSet.next()) {
+                                    List<Object> row =
+                                            Arrays.asList(
+                                                    
sinkResultSet.getLong("uuid"),
+                                                    
sinkResultSet.getString("name"),
+                                                    
sinkResultSet.getInt("score"));
+                                    actual.add(row);
+                                }
+                            }
+                            
Assertions.assertIterableEquals(expectedAfterDelete, actual);
+                        });
+        executeSql(
+                "INSERT INTO " + MYSQL_DATABASE + "." + SOURCE_TABLE + " 
VALUES (1, 'Alice', 95)");
     }
 
     private void initializeJdbcTable() {
@@ -100,4 +205,32 @@ public class DorisCDCSinkIT extends AbstractDorisIT {
             throw new RuntimeException("Initializing table failed!", e);
         }
     }
+
+    private void executeDorisSql(String sql) {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            statement.execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Connection getJdbcConnection() throws SQLException {
+        return DriverManager.getConnection(
+                MYSQL_CONTAINER.getJdbcUrl(),
+                MYSQL_CONTAINER.getUsername(),
+                MYSQL_CONTAINER.getPassword());
+    }
+
+    // Execute SQL
+    private void executeSql(String sql) {
+        try (Connection connection = getJdbcConnection()) {
+            connection.createStatement().execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void clearTable(String database, String tableName) {
+        executeDorisSql("truncate table " + database + "." + tableName);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/mysql_cdc.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/mysql_cdc.sql
new file mode 100644
index 0000000000..638da2981b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -0,0 +1,38 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- 
----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  inventory
+-- 
----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE IF NOT EXISTS `mysql_cdc`;
+
+use mysql_cdc;
+-- Create a mysql data source table
+CREATE TABLE IF NOT EXISTS `mysql_cdc`.`mysql_cdc_e2e_source_table` (
+  `uuid` BIGINT,
+  `name` VARCHAR(128),
+  `score` INT,
+  PRIMARY KEY (`uuid`)
+) ENGINE=InnoDB;
+
+
+
+truncate table `mysql_cdc`.`mysql_cdc_e2e_source_table`;
+
+INSERT INTO `mysql_cdc`.`mysql_cdc_e2e_source_table` (uuid, name, score) VALUES
+(1, 'Alice', 95),
+(2, 'Bob', 88);
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/server-gtids/my.cnf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/server-gtids/my.cnf
new file mode 100644
index 0000000000..a390897885
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/server-gtids/my.cnf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but 
would
+# be longer on a production system. Row-level info is required for ingest to 
work.
+# Server ID is required, but this will vary on production systems
+server-id         = 223344
+log_bin           = mysql-bin
+expire_logs_days  = 1
+binlog_format     = row
+
+# enable gtid mode
+gtid_mode = on
+enforce_gtid_consistency = on
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/setup.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/setup.sql
new file mode 100644
index 0000000000..429061558b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/docker/setup.sql
@@ -0,0 +1,28 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- In production you would almost certainly limit the replication user must be 
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For 
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'mysqluser' - all privileges
+-- 2) 'st_user_source' - all privileges required by the snapshot reader AND 
binlog reader (used for testing)
+--
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+
+CREATE USER 'st_user_source' IDENTIFIED BY 'mysqlpw';
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, 
DROP, LOCK TABLES  ON *.* TO 'st_user_source'@'%';
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
index d4d4e69f9d..7e811c709b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
@@ -17,23 +17,24 @@
 
 env {
   parallelism = 1
-  job.mode = "BATCH"
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
 }
 
 source {
   MySQL-CDC {
       parallelism = 1
-      server-id = 5656
-      username = "root"
-      password = "Bigdata2023@"
-      table-names = ["test.e2e_table_sink"]
-      base-url = "jdbc:mysql://119.3.230.145:56725/test"
+      server-id = 5652
+      username = "st_user_source"
+      password = "mysqlpw"
+      table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
+      base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
     }
 }
 
 sink {
   Doris {
-    fenodes = "10.16.10.14:8234"
+    fenodes = "doris_e2e:8030"
     username = root
     password = ""
     database = "test"
@@ -43,8 +44,7 @@ sink {
     sink.enable-delete = "true"
     doris.config {
       format = "csv"
-      "column_separator" = "\\x01"
-      "line_delimiter" = "\\x01"
+      "column_separator" = ","
     }
   }
 }
\ No newline at end of file

Reply via email to