This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1312a1dd27 [Feature][CDC] Support custom table primary key (#6106)
1312a1dd27 is described below

commit 1312a1dd274a18c98f184efaef060d36b1cc3fce
Author: hailin0 <wanghai...@apache.org>
AuthorDate: Wed Jan 3 19:52:39 2024 +0800

    [Feature][CDC] Support custom table primary key (#6106)
    
    * [Feature][CDC] Support custom table primary key
    
    * Update 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
    
    Co-authored-by: TaoZex <45089228+tao...@users.noreply.github.com>
    
    * Update 
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/CatalogTableUtils.java
    
    Co-authored-by: TaoZex <45089228+tao...@users.noreply.github.com>
    
    ---------
    
    Co-authored-by: TaoZex <45089228+tao...@users.noreply.github.com>
---
 docs/en/connector-v2/source/MySQL-CDC.md           |  35 ++-
 docs/en/connector-v2/source/SqlServer-CDC.md       |  32 +++
 .../cdc/base/config/JdbcSourceTableConfig.java     |  29 +++
 .../cdc/base/option/JdbcSourceOptions.java         |  14 ++
 .../cdc/base/utils/CatalogTableUtils.java          | 145 ++++++++++++
 .../seatunnel/cdc/mysql/source/MySqlDialect.java   |  23 +-
 .../cdc/mysql/source/MySqlIncrementalSource.java   |   2 +-
 .../source/MySqlIncrementalSourceFactory.java      |  16 +-
 .../seatunnel/cdc/mysql/utils/MySqlSchema.java     |  19 +-
 .../sqlserver/source/source/SqlServerDialect.java  |  23 +-
 .../source/source/SqlServerIncrementalSource.java  |   2 +-
 .../source/SqlServerIncrementalSourceFactory.java  |  16 +-
 .../sqlserver/source/utils/SqlServerSchema.java    |  11 +-
 .../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java |  57 +++++
 .../src/test/resources/ddl/mysql_cdc.sql           | 258 +++++++++++++++++++++
 .../mysqlcdc_to_mysql_with_custom_primary_key.conf |  60 +++++
 .../connector/cdc/sqlserver/SqlServerCDCIT.java    |  40 ++++
 .../src/test/resources/ddl/column_type_test.sql    |  48 ++++
 ...ercdc_to_sqlserver_with_custom_primary_key.conf |  58 +++++
 19 files changed, 873 insertions(+), 15 deletions(-)

diff --git a/docs/en/connector-v2/source/MySQL-CDC.md 
b/docs/en/connector-v2/source/MySQL-CDC.md
index 42803e7e36..bc562213a2 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -153,6 +153,7 @@ When an initial consistent snapshot is made for large 
databases, your establishe
 | password                                       | String   | Yes      | -     
  | Password to use when connecting to the database server.                     
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | database-names                                 | List     | No       | -     
  | Database name of the database to monitor.                                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | table-names                                    | List     | Yes      | -     
  | Table name of the database to monitor. The table name needs to include the 
database name, for example: `database_name.table_name`                          
                                                                                
                                                                                
                                                                                
               [...]
+| table-names-config                             | List     | No       | -     
  | Table config list. for example: [{"table": 
"db1.schema1.table1","primaryKeys":["key1"]}]                                   
                                                                                
                                                                                
                                                                                
                                               [...]
 | startup.mode                                   | Enum     | No       | 
INITIAL | Optional startup mode for MySQL CDC consumer, valid enumerations are 
`initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize 
historical data at startup, and then synchronize incremental data.<br/> 
`earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup 
from the latest offset.<br/> `specific`: Startup from user-supplied specific 
offsets.                             [...]
 | startup.specific-offset.file                   | String   | No       | -     
  | Start from the specified binlog file name. **Note, This option is required 
when the `startup.mode` option used `specific`.**                               
                                                                                
                                                                                
                                                                                
               [...]
 | startup.specific-offset.pos                    | Long     | No       | -     
  | Start from the specified binlog file position. **Note, This option is 
required when the `startup.mode` option used `specific`.**                      
                                                                                
                                                                                
                                                                                
                    [...]
@@ -190,9 +191,6 @@ env {
 
 source {
   MySQL-CDC {
-    catalog = {
-      factory = MySQL
-    }
     base-url = "jdbc:mysql://localhost:3306/testdb"
     username = "root"
     password = "root@123"
@@ -212,6 +210,37 @@ sink {
 
 > Must be used with kafka connector sink, see [compatible debezium 
 > format](../formats/cdc-compatible-debezium-json.md) for details
 
+### Support custom primary key for table
+
+```
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 10000
+}
+
+source {
+  MySQL-CDC {
+    base-url = "jdbc:mysql://localhost:3306/testdb"
+    username = "root"
+    password = "root@123"
+    
+    table-names = ["testdb.table1", "testdb.table2"]
+    table-names-config = [
+      {
+        table = "testdb.table2"
+        primaryKeys = ["id"]
+      }
+    ]
+  }
+}
+
+sink {
+  Console {
+  }
+}
+```
+
 ## Changelog
 
 - Add MySQL CDC Source Connector
diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md 
b/docs/en/connector-v2/source/SqlServer-CDC.md
index 1b32c824a4..62b788ac15 100644
--- a/docs/en/connector-v2/source/SqlServer-CDC.md
+++ b/docs/en/connector-v2/source/SqlServer-CDC.md
@@ -60,6 +60,7 @@ Please download and put SqlServer driver in 
`${SEATUNNEL_HOME}/lib/` dir. For ex
 | password                                       | String   | Yes      | -     
  | Password to use when connecting to the database server.                     
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | database-names                                 | List     | Yes      | -     
  | Database name of the database to monitor.                                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | table-names                                    | List     | Yes      | -     
  | Table name is a combination of schema name and table name 
(databaseName.schemaName.tableName).                                            
                                                                                
                                                                                
                                                                                
                                [...]
+| table-names-config                             | List     | No       | -     
  | Table config list. for example: [{"table": 
"db1.schema1.table1","primaryKeys":["key1"]}]                                   
                                                                                
                                                                                
                                                                                
                                               [...]
 | base-url                                       | String   | Yes      | -     
  | URL has to be with database, like 
"jdbc:sqlserver://localhost:1433;databaseName=test".                            
                                                                                
                                                                                
                                                                                
                                                        [...]
 | startup.mode                                   | Enum     | No       | 
INITIAL | Optional startup mode for SqlServer CDC consumer, valid enumerations 
are "initial", "earliest", "latest" and "specific".                             
                                                                                
                                                                                
                                                                                
                     [...]
 | startup.timestamp                              | Long     | No       | -     
  | Start from the specified epoch timestamp (in milliseconds).<br/> **Note, 
This option is required when** the **"startup.mode" option used 
`'timestamp'`.**                                                                
                                                                                
                                                                                
                                 [...]
@@ -186,3 +187,34 @@ sink {
   }
 ```
 
+### Support custom primary key for table
+
+```
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  SqlServer-CDC {
+    base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
+    username = "sa"
+    password = "Y.sa123456"
+    database-names = ["column_type_test"]
+    
+    table-names = ["column_type_test.dbo.simple_types", 
"column_type_test.dbo.full_types"]
+    table-names-config = [
+      {
+        table = "column_type_test.dbo.full_types"
+        primaryKeys = ["id"]
+      }
+    ]
+  }
+}
+
+sink {
+  console {
+  }
+```
+
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
new file mode 100644
index 0000000000..5cafa363e8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceTableConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class JdbcSourceTableConfig implements Serializable {
+    private String table;
+    private List<String> primaryKeys;
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
index 9542a8e970..6cd7ba0631 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.cdc.base.option;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
 import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
 
 import java.time.ZoneId;
@@ -141,4 +142,17 @@ public class JdbcSourceOptions extends SourceOptions {
                                     + "The value represents the denominator of 
the sampling rate fraction. "
                                     + "For example, a value of 1000 means a 
sampling rate of 1/1000. "
                                     + "This parameter is used when the sample 
sharding strategy is triggered.");
+
+    public static final Option<List<JdbcSourceTableConfig>> TABLE_NAMES_CONFIG 
=
+            Options.key("table-names-config")
+                    .listType(JdbcSourceTableConfig.class)
+                    .noDefaultValue()
+                    .withDescription(
+                            "Config table configs. Example: "
+                                    + "["
+                                    + "   {"
+                                    + "       \"table\": 
\"db1.schema1.table1\","
+                                    + "       \"primaryKeys\": 
[\"key1\",\"key2\"]"
+                                    + "   }"
+                                    + "]");
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/CatalogTableUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/CatalogTableUtils.java
new file mode 100644
index 0000000000..78f01370f2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/CatalogTableUtils.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.base.utils;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
+
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class CatalogTableUtils {
+
+    public static List<CatalogTable> mergeCatalogTableConfig(
+            List<CatalogTable> tables,
+            List<JdbcSourceTableConfig> tableConfigs,
+            Function<String, TablePath> parser) {
+        Map<TablePath, CatalogTable> catalogTableMap =
+                tables.stream()
+                        .collect(Collectors.toMap(t -> 
t.getTableId().toTablePath(), t -> t));
+        for (JdbcSourceTableConfig catalogTableConfig : tableConfigs) {
+            TablePath tablePath = parser.apply(catalogTableConfig.getTable());
+            CatalogTable catalogTable = catalogTableMap.get(tablePath);
+            if (catalogTable != null) {
+                catalogTable = mergeCatalogTableConfig(catalogTable, 
catalogTableConfig);
+                catalogTableMap.put(tablePath, catalogTable);
+                log.info(
+                        "Override primary key({}) for catalog table {}",
+                        catalogTableConfig.getPrimaryKeys(),
+                        catalogTableConfig.getTable());
+            } else {
+                log.warn(
+                        "Table {} is not found in catalog tables, skip to 
merge config",
+                        catalogTableConfig.getTable());
+            }
+        }
+        return new ArrayList<>(catalogTableMap.values());
+    }
+
+    public static CatalogTable mergeCatalogTableConfig(
+            final CatalogTable table, JdbcSourceTableConfig config) {
+        List<String> columnNames =
+                table.getTableSchema().getColumns().stream()
+                        .map(c -> c.getName())
+                        .collect(Collectors.toList());
+        for (String pk : config.getPrimaryKeys()) {
+            if (!columnNames.contains(pk)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Primary key(%s) is not in table(%s) 
columns(%s)",
+                                pk, table.getTablePath(), columnNames));
+            }
+        }
+        PrimaryKey primaryKeys =
+                PrimaryKey.of(
+                        "pk" + (config.getPrimaryKeys().hashCode() & 
Integer.MAX_VALUE),
+                        config.getPrimaryKeys());
+        List<Column> columns =
+                table.getTableSchema().getColumns().stream()
+                        .map(
+                                column -> {
+                                    if 
(config.getPrimaryKeys().contains(column.getName())
+                                            && column.isNullable()) {
+                                        log.warn(
+                                                "Primary key({}) is nullable 
for catalog table {}",
+                                                column.getName(),
+                                                table.getTablePath());
+                                        return PhysicalColumn.of(
+                                                column.getName(),
+                                                column.getDataType(),
+                                                column.getColumnLength(),
+                                                false,
+                                                column.getDefaultValue(),
+                                                column.getComment());
+                                    }
+                                    return column;
+                                })
+                        .collect(Collectors.toList());
+
+        return CatalogTable.of(
+                table.getTableId(),
+                TableSchema.builder()
+                        .primaryKey(primaryKeys)
+                        .columns(columns)
+                        
.constraintKey(table.getTableSchema().getConstraintKeys())
+                        .build(),
+                table.getOptions(),
+                table.getPartitionKeys(),
+                table.getComment());
+    }
+
+    public static Table mergeCatalogTableConfig(Table debeziumTable, 
CatalogTable catalogTable) {
+        PrimaryKey pk = catalogTable.getTableSchema().getPrimaryKey();
+        if (pk != null) {
+            debeziumTable = 
debeziumTable.edit().setPrimaryKeyNames(pk.getColumnNames()).create();
+            log.info(
+                    "Override primary key({}) for catalog table {}",
+                    pk.getColumnNames(),
+                    debeziumTable.id());
+        }
+        return debeziumTable;
+    }
+
+    public static Map<TableId, CatalogTable> convertTables(List<CatalogTable> 
catalogTables) {
+        Map<TableId, CatalogTable> tableMap =
+                catalogTables.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        e ->
+                                                new TableId(
+                                                        
e.getTableId().getDatabaseName(),
+                                                        
e.getTableId().getSchemaName(),
+                                                        
e.getTableId().getTableName()),
+                                        e -> e));
+        return Collections.unmodifiableMap(tableMap);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
index 30e7ddf0f3..c43b819f06 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
@@ -24,6 +27,7 @@ import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnec
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.eumerator.MySqlChunkSplitter;
@@ -40,6 +44,8 @@ import io.debezium.relational.history.TableChanges;
 
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive;
 
@@ -49,9 +55,11 @@ public class MySqlDialect implements JdbcDataSourceDialect {
     private static final long serialVersionUID = 1L;
     private final MySqlSourceConfig sourceConfig;
     private transient MySqlSchema mySqlSchema;
+    private final Map<TableId, CatalogTable> tableMap;
 
-    public MySqlDialect(MySqlSourceConfigFactory configFactory) {
+    public MySqlDialect(MySqlSourceConfigFactory configFactory, 
List<CatalogTable> catalogTables) {
         this.sourceConfig = configFactory.create(0);
+        this.tableMap = CatalogTableUtils.convertTables(catalogTables);
     }
 
     @Override
@@ -93,7 +101,8 @@ public class MySqlDialect implements JdbcDataSourceDialect {
     public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, 
TableId tableId) {
         if (mySqlSchema == null) {
             mySqlSchema =
-                    new MySqlSchema(sourceConfig, 
isDataCollectionIdCaseSensitive(sourceConfig));
+                    new MySqlSchema(
+                            sourceConfig, 
isDataCollectionIdCaseSensitive(sourceConfig), tableMap);
         }
         return mySqlSchema.getTableSchema(jdbc, tableId);
     }
@@ -112,4 +121,14 @@ public class MySqlDialect implements JdbcDataSourceDialect 
{
             return new 
MySqlBinlogFetchTask(sourceSplitBase.asIncrementalSplit());
         }
     }
+
+    @Override
+    public Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, 
TableId tableId) {
+        return 
Optional.ofNullable(tableMap.get(tableId).getTableSchema().getPrimaryKey());
+    }
+
+    @Override
+    public List<ConstraintKey> getConstraintKeys(JdbcConnection 
jdbcConnection, TableId tableId) {
+        return tableMap.get(tableId).getTableSchema().getConstraintKeys();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index f221256e1d..67ff9ff607 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -114,7 +114,7 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
 
     @Override
     public DataSourceDialect<JdbcSourceConfig> 
createDataSourceDialect(ReadonlyConfig config) {
-        return new MySqlDialect((MySqlSourceConfigFactory) configFactory);
+        return new MySqlDialect((MySqlSourceConfigFactory) configFactory, 
catalogTables);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 1ec94c3cfc..defe0a6ab9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -23,22 +23,26 @@ import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
 import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
 
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Optional;
 
 @AutoService(Factory.class)
 public class MySqlIncrementalSourceFactory implements TableSourceFactory {
@@ -65,7 +69,8 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory {
                         
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
                         
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
                         JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
-                        JdbcSourceOptions.INVERSE_SAMPLING_RATE)
+                        JdbcSourceOptions.INVERSE_SAMPLING_RATE,
+                        JdbcSourceOptions.TABLE_NAMES_CONFIG)
                 .optional(MySqlSourceOptions.STARTUP_MODE, 
MySqlSourceOptions.STOP_MODE)
                 .conditional(
                         MySqlSourceOptions.STARTUP_MODE,
@@ -96,6 +101,15 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory {
             List<CatalogTable> catalogTables =
                     CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
+            Optional<List<JdbcSourceTableConfig>> tableConfigs =
+                    
context.getOptions().getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG);
+            if (tableConfigs.isPresent()) {
+                catalogTables =
+                        CatalogTableUtils.mergeCatalogTableConfig(
+                                catalogTables,
+                                tableConfigs.get(),
+                                text -> TablePath.of(text, false));
+            }
             SeaTunnelDataType<SeaTunnelRow> dataType =
                     CatalogTableUtil.convertToMultipleRowType(catalogTables);
             return (SeaTunnelSource<T, SplitT, StateT>)
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java
index e921342998..324f91fc6e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlSchema.java
@@ -17,13 +17,17 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
 
 import io.debezium.connector.mysql.MySqlConnectorConfig;
 import io.debezium.connector.mysql.MySqlDatabaseSchema;
 import io.debezium.connector.mysql.MySqlOffsetContext;
 import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
 import io.debezium.relational.history.TableChanges.TableChange;
 import io.debezium.schema.SchemaChangeEvent;
 
@@ -41,13 +45,18 @@ public class MySqlSchema {
     private final MySqlConnectorConfig connectorConfig;
     private final MySqlDatabaseSchema databaseSchema;
     private final Map<TableId, TableChange> schemasByTableId;
+    private final Map<TableId, CatalogTable> tableMap;
 
-    public MySqlSchema(MySqlSourceConfig sourceConfig, boolean 
isTableIdCaseSensitive) {
+    public MySqlSchema(
+            MySqlSourceConfig sourceConfig,
+            boolean isTableIdCaseSensitive,
+            Map<TableId, CatalogTable> tableMap) {
         this.connectorConfig = sourceConfig.getDbzConnectorConfig();
         this.databaseSchema =
                 MySqlConnectionUtils.createMySqlDatabaseSchema(
                         connectorConfig, isTableIdCaseSensitive);
         this.schemasByTableId = new HashMap<>();
+        this.tableMap = tableMap;
     }
 
     /**
@@ -81,7 +90,13 @@ public class MySqlSchema {
                             for (SchemaChangeEvent schemaChangeEvent : 
schemaChangeEvents) {
                                 for (TableChange tableChange :
                                         schemaChangeEvent.getTableChanges()) {
-                                    tableChangeMap.put(tableId, tableChange);
+                                    Table table =
+                                            
CatalogTableUtils.mergeCatalogTableConfig(
+                                                    tableChange.getTable(), 
tableMap.get(tableId));
+                                    TableChange newTableChange =
+                                            new TableChange(
+                                                    
TableChanges.TableChangeType.CREATE, table);
+                                    tableChangeMap.put(tableId, 
newTableChange);
                                 }
                             }
                         }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
index c337ddc2a0..e667412378 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
@@ -24,6 +27,7 @@ import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnec
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.eumerator.SqlServerChunkSplitter;
@@ -40,6 +44,8 @@ import io.debezium.relational.history.TableChanges;
 
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
 
@@ -50,9 +56,12 @@ public class SqlServerDialect implements 
JdbcDataSourceDialect {
     private final SqlServerSourceConfig sourceConfig;
 
     private transient SqlServerSchema sqlServerSchema;
+    private final Map<TableId, CatalogTable> tableMap;
 
-    public SqlServerDialect(SqlServerSourceConfigFactory configFactory) {
+    public SqlServerDialect(
+            SqlServerSourceConfigFactory configFactory, List<CatalogTable> 
catalogTables) {
         this.sourceConfig = configFactory.create(0);
+        this.tableMap = CatalogTableUtils.convertTables(catalogTables);
     }
 
     @Override
@@ -95,7 +104,7 @@ public class SqlServerDialect implements 
JdbcDataSourceDialect {
     @Override
     public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, 
TableId tableId) {
         if (sqlServerSchema == null) {
-            sqlServerSchema = new 
SqlServerSchema(sourceConfig.getDbzConnectorConfig());
+            sqlServerSchema = new 
SqlServerSchema(sourceConfig.getDbzConnectorConfig(), tableMap);
         }
         return sqlServerSchema.getTableSchema(jdbc, tableId);
     }
@@ -115,4 +124,14 @@ public class SqlServerDialect implements 
JdbcDataSourceDialect {
             return new 
SqlServerTransactionLogFetchTask(sourceSplitBase.asIncrementalSplit());
         }
     }
+
+    @Override
+    public Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, 
TableId tableId) {
+        return 
Optional.ofNullable(tableMap.get(tableId).getTableSchema().getPrimaryKey());
+    }
+
+    @Override
+    public List<ConstraintKey> getConstraintKeys(JdbcConnection 
jdbcConnection, TableId tableId) {
+        return tableMap.get(tableId).getTableSchema().getConstraintKeys();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
index 5a3a3cc9eb..4ab64ff692 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
@@ -114,7 +114,7 @@ public class SqlServerIncrementalSource<T> extends 
IncrementalSource<T, JdbcSour
 
     @Override
     public DataSourceDialect<JdbcSourceConfig> 
createDataSourceDialect(ReadonlyConfig config) {
-        return new SqlServerDialect((SqlServerSourceConfigFactory) 
configFactory);
+        return new SqlServerDialect((SqlServerSourceConfigFactory) 
configFactory, catalogTables);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 6338d85aa2..95031e9b9f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -23,22 +23,26 @@ import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
 import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
 
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Optional;
 
 @AutoService(Factory.class)
 public class SqlServerIncrementalSourceFactory implements TableSourceFactory {
@@ -64,7 +68,8 @@ public class SqlServerIncrementalSourceFactory implements 
TableSourceFactory {
                         JdbcSourceOptions.CONNECTION_POOL_SIZE,
                         
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
                         
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
-                        JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
+                        JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
+                        JdbcSourceOptions.TABLE_NAMES_CONFIG)
                 .optional(SqlServerSourceOptions.STARTUP_MODE, 
SqlServerSourceOptions.STOP_MODE)
                 .conditional(
                         SqlServerSourceOptions.STARTUP_MODE,
@@ -101,6 +106,15 @@ public class SqlServerIncrementalSourceFactory implements 
TableSourceFactory {
             List<CatalogTable> catalogTables =
                     CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
+            Optional<List<JdbcSourceTableConfig>> tableConfigs =
+                    
context.getOptions().getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG);
+            if (tableConfigs.isPresent()) {
+                catalogTables =
+                        CatalogTableUtils.mergeCatalogTableConfig(
+                                catalogTables,
+                                tableConfigs.get(),
+                                text -> TablePath.of(text, true));
+            }
             SeaTunnelDataType<SeaTunnelRow> dataType =
                     CatalogTableUtil.convertToMultipleRowType(catalogTables);
             return new SqlServerIncrementalSource(context.getOptions(), 
dataType, catalogTables);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
index 83d51ae31b..79f58e3e2d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
 
 import io.debezium.connector.sqlserver.SqlServerConnection;
 import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
@@ -38,10 +40,13 @@ public class SqlServerSchema {
 
     private final SqlServerConnectorConfig connectorConfig;
     private final Map<TableId, TableChange> schemasByTableId;
+    private final Map<TableId, CatalogTable> tableMap;
 
-    public SqlServerSchema(SqlServerConnectorConfig connectorConfig) {
+    public SqlServerSchema(
+            SqlServerConnectorConfig connectorConfig, Map<TableId, 
CatalogTable> tableMap) {
         this.schemasByTableId = new ConcurrentHashMap<>();
         this.connectorConfig = connectorConfig;
+        this.tableMap = tableMap;
     }
 
     public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
@@ -67,7 +72,9 @@ public class SqlServerSchema {
                     connectorConfig.getTableFilters().dataCollectionFilter(),
                     null,
                     false);
-            Table table = tables.forTable(tableId);
+            Table table =
+                    CatalogTableUtils.mergeCatalogTableConfig(
+                            tables.forTable(tableId), tableMap.get(tableId));
             TableChange tableChange = new 
TableChange(TableChanges.TableChangeType.CREATE, table);
             tableChangeMap.put(tableId, tableChange);
         } catch (SQLException e) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 366b3146b1..f2d5669c37 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -96,6 +96,11 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
     private static final String SOURCE_TABLE_2 = "mysql_cdc_e2e_source_table2";
     private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
             "mysql_cdc_e2e_source_table_no_primary_key";
+
+    private static final String SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY =
+            "mysql_cdc_e2e_source_table_1_custom_primary_key";
+    private static final String SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY =
+            "mysql_cdc_e2e_source_table_2_custom_primary_key";
     private static final String SINK_TABLE = "mysql_cdc_e2e_sink_table";
 
     private static MySqlContainer createMySqlContainer(MySqlVersion version) {
@@ -414,6 +419,58 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
         log.info("****************** container logs end ******************");
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+    public void testMysqlCdcMultiTableWithCustomPrimaryKey(TestContainer 
container) {
+        // Clear related content to ensure that multiple operations are not 
affected
+        clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY);
+        clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY);
+
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        
container.executeJob("/mysqlcdc_to_mysql_with_custom_primary_key.conf");
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+
+        // insert update delete
+        upsertDeleteSourceTable(MYSQL_DATABASE, 
SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY);
+        upsertDeleteSourceTable(MYSQL_DATABASE, 
SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY);
+
+        // stream stage
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertAll(
+                                        () ->
+                                                
Assertions.assertIterableEquals(
+                                                        query(
+                                                                
getSourceQuerySQL(
+                                                                        
MYSQL_DATABASE,
+                                                                        
SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY)),
+                                                        query(
+                                                                
getSourceQuerySQL(
+                                                                        
MYSQL_DATABASE2,
+                                                                        
SOURCE_TABLE_1_CUSTOM_PRIMARY_KEY))),
+                                        () ->
+                                                
Assertions.assertIterableEquals(
+                                                        query(
+                                                                
getSourceQuerySQL(
+                                                                        
MYSQL_DATABASE,
+                                                                        
SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY)),
+                                                        query(
+                                                                
getSourceQuerySQL(
+                                                                        
MYSQL_DATABASE2,
+                                                                        
SOURCE_TABLE_2_CUSTOM_PRIMARY_KEY)))));
+    }
+
     private Connection getJdbcConnection() throws SQLException {
         return DriverManager.getConnection(
                 MYSQL_CONTAINER.getJdbcUrl(),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
index 91ae73bd27..b909f9aacd 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/mysql_cdc.sql
@@ -170,6 +170,102 @@ CREATE TABLE mysql_cdc_e2e_source_table_no_primary_key
   DEFAULT CHARSET = utf8mb4
   COLLATE = utf8mb4_0900_ai_ci;
 
+CREATE TABLE mysql_cdc_e2e_source_table_1_custom_primary_key
+(
+    `id`                   int                            NOT NULL,
+    `f_binary`             binary(64)                     DEFAULT NULL,
+    `f_blob`               blob,
+    `f_long_varbinary`     mediumblob,
+    `f_longblob`           longblob,
+    `f_tinyblob`           tinyblob,
+    `f_varbinary`          varbinary(100)                 DEFAULT NULL,
+    `f_smallint`           smallint                       DEFAULT NULL,
+    `f_smallint_unsigned`  smallint unsigned              DEFAULT NULL,
+    `f_mediumint`          mediumint                      DEFAULT NULL,
+    `f_mediumint_unsigned` mediumint unsigned             DEFAULT NULL,
+    `f_int`                int                            DEFAULT NULL,
+    `f_int_unsigned`       int unsigned                   DEFAULT NULL,
+    `f_integer`            int                            DEFAULT NULL,
+    `f_integer_unsigned`   int unsigned                   DEFAULT NULL,
+    `f_bigint`             bigint                         DEFAULT NULL,
+    `f_bigint_unsigned`    bigint unsigned                DEFAULT NULL,
+    `f_numeric`            decimal(10, 0)                 DEFAULT NULL,
+    `f_decimal`            decimal(10, 0)                 DEFAULT NULL,
+    `f_float`              float                          DEFAULT NULL,
+    `f_double`             double                         DEFAULT NULL,
+    `f_double_precision`   double                         DEFAULT NULL,
+    `f_longtext`           longtext,
+    `f_mediumtext`         mediumtext,
+    `f_text`               text,
+    `f_tinytext`           tinytext,
+    `f_varchar`            varchar(100)                   DEFAULT NULL,
+    `f_date`               date                           DEFAULT NULL,
+    `f_datetime`           datetime                       DEFAULT NULL,
+    `f_timestamp`          timestamp NULL                 DEFAULT NULL,
+    `f_bit1`               bit(1)                         DEFAULT NULL,
+    `f_bit64`              bit(64)                        DEFAULT NULL,
+    `f_char`               char(1)                        DEFAULT NULL,
+    `f_enum`               enum ('enum1','enum2','enum3') DEFAULT NULL,
+    `f_mediumblob`         mediumblob,
+    `f_long_varchar`       mediumtext,
+    `f_real`               double                         DEFAULT NULL,
+    `f_time`               time                           DEFAULT NULL,
+    `f_tinyint`            tinyint                        DEFAULT NULL,
+    `f_tinyint_unsigned`   tinyint unsigned               DEFAULT NULL,
+    `f_json`               json                           DEFAULT NULL,
+    `f_year`               year                           DEFAULT NULL
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4
+  COLLATE = utf8mb4_0900_ai_ci;
+
+CREATE TABLE mysql_cdc_e2e_source_table_2_custom_primary_key
+(
+    `id`                   int                            NOT NULL,
+    `f_binary`             binary(64)                     DEFAULT NULL,
+    `f_blob`               blob,
+    `f_long_varbinary`     mediumblob,
+    `f_longblob`           longblob,
+    `f_tinyblob`           tinyblob,
+    `f_varbinary`          varbinary(100)                 DEFAULT NULL,
+    `f_smallint`           smallint                       DEFAULT NULL,
+    `f_smallint_unsigned`  smallint unsigned              DEFAULT NULL,
+    `f_mediumint`          mediumint                      DEFAULT NULL,
+    `f_mediumint_unsigned` mediumint unsigned             DEFAULT NULL,
+    `f_int`                int                            DEFAULT NULL,
+    `f_int_unsigned`       int unsigned                   DEFAULT NULL,
+    `f_integer`            int                            DEFAULT NULL,
+    `f_integer_unsigned`   int unsigned                   DEFAULT NULL,
+    `f_bigint`             bigint                         DEFAULT NULL,
+    `f_bigint_unsigned`    bigint unsigned                DEFAULT NULL,
+    `f_numeric`            decimal(10, 0)                 DEFAULT NULL,
+    `f_decimal`            decimal(10, 0)                 DEFAULT NULL,
+    `f_float`              float                          DEFAULT NULL,
+    `f_double`             double                         DEFAULT NULL,
+    `f_double_precision`   double                         DEFAULT NULL,
+    `f_longtext`           longtext,
+    `f_mediumtext`         mediumtext,
+    `f_text`               text,
+    `f_tinytext`           tinytext,
+    `f_varchar`            varchar(100)                   DEFAULT NULL,
+    `f_date`               date                           DEFAULT NULL,
+    `f_datetime`           datetime                       DEFAULT NULL,
+    `f_timestamp`          timestamp NULL                 DEFAULT NULL,
+    `f_bit1`               bit(1)                         DEFAULT NULL,
+    `f_bit64`              bit(64)                        DEFAULT NULL,
+    `f_char`               char(1)                        DEFAULT NULL,
+    `f_enum`               enum ('enum1','enum2','enum3') DEFAULT NULL,
+    `f_mediumblob`         mediumblob,
+    `f_long_varchar`       mediumtext,
+    `f_real`               double                         DEFAULT NULL,
+    `f_time`               time                           DEFAULT NULL,
+    `f_tinyint`            tinyint                        DEFAULT NULL,
+    `f_tinyint_unsigned`   tinyint unsigned               DEFAULT NULL,
+    `f_json`               json                           DEFAULT NULL,
+    `f_year`               year                           DEFAULT NULL
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4
+  COLLATE = utf8mb4_0900_ai_ci;
+
 CREATE TABLE mysql_cdc_e2e_sink_table
 (
     `id`                   int       NOT NULL AUTO_INCREMENT,
@@ -223,6 +319,8 @@ CREATE TABLE mysql_cdc_e2e_sink_table
 truncate table mysql_cdc_e2e_source_table;
 truncate table mysql_cdc_e2e_source_table2;
 truncate table mysql_cdc_e2e_source_table_no_primary_key;
+truncate table mysql_cdc_e2e_source_table_1_custom_primary_key;
+truncate table mysql_cdc_e2e_source_table_2_custom_primary_key;
 truncate table mysql_cdc_e2e_sink_table;
 
 INSERT INTO mysql_cdc_e2e_source_table ( id, f_binary, f_blob, 
f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,
@@ -318,6 +416,68 @@ VALUES ( 1, 
0x616263740000000000000000000000000000000000000000000000000000000000
          
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', 112.345,
          '14:30:00', -128, 22, '{ "key": "value" }', 2021 );
 
+INSERT INTO mysql_cdc_e2e_source_table_1_custom_primary_key ( id, f_binary, 
f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,
+                                                        f_smallint_unsigned, 
f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,
+                                                        f_integer_unsigned, 
f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,
+                                                        f_double_precision, 
f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,
+                                                        f_timestamp, f_bit1, 
f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,
+                                                        f_tinyint, 
f_tinyint_unsigned, f_json, f_year )
+VALUES ( 1, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
+         0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 
654321, 1234567, 7654321, 1234567, 7654321,
+         123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long 
text field', 'This is a medium text field',
+         'This is a text field', 'This is a tiny text field', 'This is a 
varchar field', '2022-04-27', '2022-04-27 14:30:00',
+         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',
+         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',
+         12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ),
+       ( 2, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 
0x74696E79626C6F62,
+         0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 
7654321, 1234567, 7654321, 123456789, 987654321,
+         123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is 
a medium text field', 'This is a text field',
+         'This is a tiny text field', 'This is a varchar field', '2022-04-27', 
'2022-04-27 14:30:00', '2023-04-27 11:08:40',
+         1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',
+         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',
+         112.345, '14:30:00', -128, 22, '{ "key": "value" }', 2013 ),
+       ( 3, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 
0x74696E79626C6F62,
+         0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 
7654321, 1234567, 7654321, 123456789, 987654321, 123,
+         789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a 
medium text field', 'This is a text field',
+         'This is a tiny text field', 'This is a varchar field', '2022-04-27', 
'2022-04-27 14:30:00', '2023-04-27 11:08:40',
+         1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',
+         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', 112.345,
+         '14:30:00', -128, 22, '{ "key": "value" }', 2021 );
+
+INSERT INTO mysql_cdc_e2e_source_table_2_custom_primary_key ( id, f_binary, 
f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint,
+                                                        f_smallint_unsigned, 
f_mediumint, f_mediumint_unsigned, f_int, f_int_unsigned, f_integer,
+                                                        f_integer_unsigned, 
f_bigint, f_bigint_unsigned, f_numeric, f_decimal, f_float, f_double,
+                                                        f_double_precision, 
f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime,
+                                                        f_timestamp, f_bit1, 
f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time,
+                                                        f_tinyint, 
f_tinyint_unsigned, f_json, f_year )
+VALUES ( 1, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL,
+         0x74696E79626C6F62, 0x48656C6C6F20776F726C64, 12345, 54321, 123456, 
654321, 1234567, 7654321, 1234567, 7654321,
+         123456789, 987654321, 123, 789, 12.34, 56.78, 90.12, 'This is a long 
text field', 'This is a medium text field',
+         'This is a text field', 'This is a tiny text field', 'This is a 
varchar field', '2022-04-27', '2022-04-27 14:30:00',
+         '2023-04-27 11:08:40', 1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',
+         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',
+         12.345, '14:30:00', -128, 255, '{ "key": "value" }', 2022 ),
+       ( 2, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 
0x74696E79626C6F62,
+         0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 
7654321, 1234567, 7654321, 123456789, 987654321,
+         123, 789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is 
a medium text field', 'This is a text field',
+         'This is a tiny text field', 'This is a varchar field', '2022-04-27', 
'2022-04-27 14:30:00', '2023-04-27 11:08:40',
+         1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',
+         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field',
+         112.345, '14:30:00', -128, 22, '{ "key": "value" }', 2013 ),
+       ( 3, 
0x61626374000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000,
+         0x68656C6C6F, 
0x18000000789C0BC9C82C5600A244859CFCBC7485B2C4A2A4CCBCC4A24A00697308D4, NULL, 
0x74696E79626C6F62,
+         0x48656C6C6F20776F726C64, 12345, 54321, 123456, 654321, 1234567, 
7654321, 1234567, 7654321, 123456789, 987654321, 123,
+         789, 12.34, 56.78, 90.12, 'This is a long text field', 'This is a 
medium text field', 'This is a text field',
+         'This is a tiny text field', 'This is a varchar field', '2022-04-27', 
'2022-04-27 14:30:00', '2023-04-27 11:08:40',
+         1, 
b'0101010101010101010101010101010101010101010101010101010101010101', 'C', 
'enum2',
+         
0x1B000000789C0BC9C82C5600A24485DCD494CCD25C85A49CFC2485B4CCD49C140083FF099A, 
'This is a long varchar field', 112.345,
+         '14:30:00', -128, 22, '{ "key": "value" }', 2021 );
+
 CREATE DATABASE IF NOT EXISTS `mysql_cdc2`;
 
 use mysql_cdc2;
@@ -421,3 +581,101 @@ CREATE TABLE mysql_cdc_e2e_source_table2
   AUTO_INCREMENT = 2
   DEFAULT CHARSET = utf8mb4
   COLLATE = utf8mb4_0900_ai_ci;
+
+CREATE TABLE mysql_cdc_e2e_source_table_1_custom_primary_key
+(
+    `id`                   int                            NOT NULL,
+    `f_binary`             binary(64)                     DEFAULT NULL,
+    `f_blob`               blob,
+    `f_long_varbinary`     mediumblob,
+    `f_longblob`           longblob,
+    `f_tinyblob`           tinyblob,
+    `f_varbinary`          varbinary(100)                 DEFAULT NULL,
+    `f_smallint`           smallint                       DEFAULT NULL,
+    `f_smallint_unsigned`  smallint unsigned              DEFAULT NULL,
+    `f_mediumint`          mediumint                      DEFAULT NULL,
+    `f_mediumint_unsigned` mediumint unsigned             DEFAULT NULL,
+    `f_int`                int                            DEFAULT NULL,
+    `f_int_unsigned`       int unsigned                   DEFAULT NULL,
+    `f_integer`            int                            DEFAULT NULL,
+    `f_integer_unsigned`   int unsigned                   DEFAULT NULL,
+    `f_bigint`             bigint                         DEFAULT NULL,
+    `f_bigint_unsigned`    bigint unsigned                DEFAULT NULL,
+    `f_numeric`            decimal(10, 0)                 DEFAULT NULL,
+    `f_decimal`            decimal(10, 0)                 DEFAULT NULL,
+    `f_float`              float                          DEFAULT NULL,
+    `f_double`             double                         DEFAULT NULL,
+    `f_double_precision`   double                         DEFAULT NULL,
+    `f_longtext`           longtext,
+    `f_mediumtext`         mediumtext,
+    `f_text`               text,
+    `f_tinytext`           tinytext,
+    `f_varchar`            varchar(100)                   DEFAULT NULL,
+    `f_date`               date                           DEFAULT NULL,
+    `f_datetime`           datetime                       DEFAULT NULL,
+    `f_timestamp`          timestamp NULL                 DEFAULT NULL,
+    `f_bit1`               bit(1)                         DEFAULT NULL,
+    `f_bit64`              bit(64)                        DEFAULT NULL,
+    `f_char`               char(1)                        DEFAULT NULL,
+    `f_enum`               enum ('enum1','enum2','enum3') DEFAULT NULL,
+    `f_mediumblob`         mediumblob,
+    `f_long_varchar`       mediumtext,
+    `f_real`               double                         DEFAULT NULL,
+    `f_time`               time                           DEFAULT NULL,
+    `f_tinyint`            tinyint                        DEFAULT NULL,
+    `f_tinyint_unsigned`   tinyint unsigned               DEFAULT NULL,
+    `f_json`               json                           DEFAULT NULL,
+    `f_year`               year                           DEFAULT NULL,
+    PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4
+  COLLATE = utf8mb4_0900_ai_ci;
+
+CREATE TABLE mysql_cdc_e2e_source_table_2_custom_primary_key
+(
+    `id`                   int                            NOT NULL,
+    `f_binary`             binary(64)                     DEFAULT NULL,
+    `f_blob`               blob,
+    `f_long_varbinary`     mediumblob,
+    `f_longblob`           longblob,
+    `f_tinyblob`           tinyblob,
+    `f_varbinary`          varbinary(100)                 DEFAULT NULL,
+    `f_smallint`           smallint                       DEFAULT NULL,
+    `f_smallint_unsigned`  smallint unsigned              DEFAULT NULL,
+    `f_mediumint`          mediumint                      DEFAULT NULL,
+    `f_mediumint_unsigned` mediumint unsigned             DEFAULT NULL,
+    `f_int`                int                            DEFAULT NULL,
+    `f_int_unsigned`       int unsigned                   DEFAULT NULL,
+    `f_integer`            int                            DEFAULT NULL,
+    `f_integer_unsigned`   int unsigned                   DEFAULT NULL,
+    `f_bigint`             bigint                         DEFAULT NULL,
+    `f_bigint_unsigned`    bigint unsigned                DEFAULT NULL,
+    `f_numeric`            decimal(10, 0)                 DEFAULT NULL,
+    `f_decimal`            decimal(10, 0)                 DEFAULT NULL,
+    `f_float`              float                          DEFAULT NULL,
+    `f_double`             double                         DEFAULT NULL,
+    `f_double_precision`   double                         DEFAULT NULL,
+    `f_longtext`           longtext,
+    `f_mediumtext`         mediumtext,
+    `f_text`               text,
+    `f_tinytext`           tinytext,
+    `f_varchar`            varchar(100)                   DEFAULT NULL,
+    `f_date`               date                           DEFAULT NULL,
+    `f_datetime`           datetime                       DEFAULT NULL,
+    `f_timestamp`          timestamp NULL                 DEFAULT NULL,
+    `f_bit1`               bit(1)                         DEFAULT NULL,
+    `f_bit64`              bit(64)                        DEFAULT NULL,
+    `f_char`               char(1)                        DEFAULT NULL,
+    `f_enum`               enum ('enum1','enum2','enum3') DEFAULT NULL,
+    `f_mediumblob`         mediumblob,
+    `f_long_varchar`       mediumtext,
+    `f_real`               double                         DEFAULT NULL,
+    `f_time`               time                           DEFAULT NULL,
+    `f_tinyint`            tinyint                        DEFAULT NULL,
+    `f_tinyint_unsigned`   tinyint unsigned               DEFAULT NULL,
+    `f_json`               json                           DEFAULT NULL,
+    `f_year`               year                           DEFAULT NULL,
+    PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4
+  COLLATE = utf8mb4_0900_ai_ci;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
new file mode 100644
index 0000000000..ba3e94855f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  MySQL-CDC {
+    result_table_name = "customers_mysql_cdc"
+    server-id = 5652
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
+    username = "mysqluser"
+    password = "mysqlpw"
+    exactly_once = true
+    table-names = 
["mysql_cdc.mysql_cdc_e2e_source_table_1_custom_primary_key", 
"mysql_cdc.mysql_cdc_e2e_source_table_2_custom_primary_key"]
+    table-names-config = [
+      {
+        table = "mysql_cdc.mysql_cdc_e2e_source_table_1_custom_primary_key"
+        primaryKeys = ["id"]
+      },
+      {
+        table = "mysql_cdc.mysql_cdc_e2e_source_table_2_custom_primary_key"
+        primaryKeys = ["id"]
+      }
+    ]
+  }
+}
+
+sink {
+  jdbc {
+    source_table_name = "customers_mysql_cdc"
+    url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc2"
+    driver = "com.mysql.cj.jdbc.Driver"
+    user = "mysqluser"
+    password = "mysqlpw"
+    database = "mysql_cdc2"
+    generate_sink_sql = true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 597838096b..1216c69645 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -83,6 +83,8 @@ public class SqlServerCDCIT extends TestSuiteBase implements 
TestResource {
     private static final String SOURCE_TABLE = 
"column_type_test.dbo.full_types";
     private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
             "column_type_test.dbo.full_types_no_primary_key";
+    private static final String SOURCE_TABLE_CUSTOM_PRIMARY_KEY =
+            "column_type_test.dbo.full_types_custom_primary_key";
     private static final String SINK_TABLE = 
"column_type_test.dbo.full_types_sink";
     private static final String SELECT_SOURCE_SQL =
             "select\n"
@@ -265,6 +267,44 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
                         });
     }
 
+    @TestTemplate
+    public void testCDCWithCustomPrimaryKey(TestContainer container) {
+        initializeSqlServerTable("column_type_test");
+
+        CompletableFuture<Void> executeJobFuture =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                container.executeJob(
+                                        
"/sqlservercdc_to_sqlserver_with_custom_primary_key.conf");
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                            return null;
+                        });
+
+        // snapshot stage
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    querySql(SELECT_SOURCE_SQL, 
SOURCE_TABLE_CUSTOM_PRIMARY_KEY),
+                                    querySql(SELECT_SINK_SQL, SINK_TABLE));
+                        });
+
+        // insert update delete
+        updateSourceTable(SOURCE_TABLE_CUSTOM_PRIMARY_KEY);
+
+        // stream stage
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    querySql(SELECT_SOURCE_SQL, 
SOURCE_TABLE_CUSTOM_PRIMARY_KEY),
+                                    querySql(SELECT_SINK_SQL, SINK_TABLE));
+                        });
+    }
+
     /**
      * Executes a JDBC statement using the default jdbc config without 
autocommitting the
      * connection.
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
index d227c34615..0c6aebe4fd 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/ddl/column_type_test.sql
@@ -120,6 +120,54 @@ INSERT INTO full_types_no_primary_key VALUES (2,
                                
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
 EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 
'full_types_no_primary_key', @role_name = NULL, @supports_net_changes = 0;
 
+CREATE TABLE full_types_custom_primary_key (
+                                           id int NOT NULL,
+                                           val_char char(3),
+                                           val_varchar varchar(1000),
+                                           val_text text,
+                                           val_nchar nchar(3),
+                                           val_nvarchar nvarchar(1000),
+                                           val_ntext ntext,
+                                           val_decimal decimal(6,3),
+                                           val_numeric numeric,
+                                           val_float float,
+                                           val_real real,
+                                           val_smallmoney smallmoney,
+                                           val_money money,
+                                           val_bit bit,
+                                           val_tinyint tinyint,
+                                           val_smallint smallint,
+                                           val_int int,
+                                           val_bigint bigint,
+                                           val_date date,
+                                           val_time time,
+                                           val_datetime2 datetime2,
+                                           val_datetime datetime,
+                                           val_smalldatetime smalldatetime,
+                                           val_xml xml,
+                                           val_datetimeoffset 
DATETIMEOFFSET(4),
+                                           val_varbinary  varbinary(100)
+);
+INSERT INTO full_types_custom_primary_key VALUES (0,
+                                              'cč0', 'vcč', 'tč', N'cč', 
N'vcč', N'tč',
+                                              1.123, 2, 3.323, 4.323, 5.323, 
6.323,
+                                              1, 22, 333, 4444, 55555,
+                                              '2018-07-13', '10:23:45', 
'2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+                                              
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+INSERT INTO full_types_custom_primary_key VALUES (1,
+                                              'cč1', 'vcč', 'tč', N'cč', 
N'vcč', N'tč',
+                                              1.123, 2, 3.323, 4.323, 5.323, 
6.323,
+                                              1, 22, 333, 4444, 55555,
+                                              '2018-07-13', '10:23:45', 
'2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+                                              
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+INSERT INTO full_types_custom_primary_key VALUES (2,
+                                              'cč2', 'vcč', 'tč', N'cč', 
N'vcč', N'tč',
+                                              1.123, 2, 3.323, 4.323, 5.323, 
6.323,
+                                              1, 22, 333, 4444, 55555,
+                                              '2018-07-13', '10:23:45', 
'2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45',
+                                              
'<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS varbinary(100)));
+EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 
'full_types_custom_primary_key', @role_name = NULL, @supports_net_changes = 0;
+
 CREATE TABLE full_types_sink (
                             id int NOT NULL,
                             val_char char(3),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_custom_primary_key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_custom_primary_key.conf
new file mode 100644
index 0000000000..4e9f4e5e73
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_with_custom_primary_key.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  SqlServer-CDC {
+    result_table_name = "customers"
+    username = "sa"
+    password = "Password!"
+    database-names = ["column_type_test"]
+    table-names = ["column_type_test.dbo.full_types_custom_primary_key"]
+    base-url = 
"jdbc:sqlserver://sqlserver-host:1433;databaseName=column_type_test"
+
+    exactly_once = false
+  }
+}
+
+transform {
+}
+
+sink {
+  Jdbc {
+    source_table_name = "customers"
+    driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+    url = "jdbc:sqlserver://sqlserver-host:1433;encrypt=false"
+    user = "sa"
+    password = "Password!"
+    generate_sink_sql = true
+    database = "column_type_test"
+    table = "dbo.full_types_sink"
+    batch_size = 1
+    primary_keys = ["id"]
+  }
+}
\ No newline at end of file

Reply via email to