This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cc6fcb652 [INLONG-6842][Sort] Improve mysql-cdc2.0 to support tables 
without primary key (#6859)
cc6fcb652 is described below

commit cc6fcb6528aedb5be3f0f4d7d3d18d1e9f62b095
Author: Schnapps <zpen...@connect.ust.hk>
AuthorDate: Wed Dec 14 10:32:37 2022 +0800

    [INLONG-6842][Sort] Improve mysql-cdc2.0 to support tables without primary 
key (#6859)
    
    Co-authored-by: stingpeng <stingp...@tencent.com>
---
 .../debezium/task/MySqlSnapshotSplitReadTask.java  | 18 ++++++++-
 .../cdc/mysql/source/assigners/ChunkSplitter.java  | 47 ++++++++++++++++++----
 .../cdc/mysql/source/split/MySqlSnapshotSplit.java |  8 ++++
 .../mysql/table/MySqlTableInlongSourceFactory.java |  1 -
 .../apache/inlong/sort/parser/AllMigrateTest.java  |  2 +-
 5 files changed, 65 insertions(+), 11 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
index 28fb660d9..6a37a0f18 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java
@@ -149,7 +149,8 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
         LOG.info("Snapshot step 2 - Snapshotting data");
         createDataEvents(ctx, snapshotSplit.getTableId());
 
-        final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
+        BinlogOffset highWatermark = determineHighWatermark(lowWatermark);
+
         LOG.info(
                 "Snapshot step 3 - Determining high watermark {} for split {}",
                 highWatermark,
@@ -162,6 +163,21 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
         return SnapshotResult.completed(ctx.offset);
     }
 
+    /**
+     * for chunk that equals to the whole table we do not need to normalize
+     * the snapshot data and the binlog data, just set high watermark to low 
watermark
+     * @return highWatermark
+     */
+    private BinlogOffset determineHighWatermark(BinlogOffset lowWatermark) {
+        if (snapshotSplit.isWholeSplit()) {
+            LOG.info("for split {}, set highWatermark to lowWatermark {} since"
+                    + " it reads the whole table ", snapshotSplit, 
lowWatermark);
+            return lowWatermark;
+        } else {
+            return currentBinlogOffset(jdbcConnection);
+        }
+    }
+
     @Override
     protected SnapshottingTask getSnapshottingTask(OffsetContext 
previousOffset) {
         return new SnapshottingTask(false, true);
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
index e4a6bd3e4..26494f3cf 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
@@ -114,17 +114,13 @@ class ChunkSplitter {
             long start = System.currentTimeMillis();
 
             Table table = mySqlSchema.getTableSchema(jdbc, tableId).getTable();
-            Column splitColumn = ChunkUtils.getSplitColumn(table);
-            final List<ChunkRange> chunks;
-            try {
-                chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
-            } catch (SQLException e) {
-                throw new FlinkRuntimeException("Failed to split chunks for 
table " + tableId, e);
-            }
+
+            List<ChunkRange> chunks = getChunks(tableId, jdbc, table);
 
             // convert chunks into splits
             List<MySqlSnapshotSplit> splits = new ArrayList<>();
-            RowType splitType = ChunkUtils.getSplitType(splitColumn);
+
+            RowType splitType = ChunkUtils.getSplitType(getSplitColumn(table));
             for (int i = 0; i < chunks.size(); i++) {
                 ChunkRange chunk = chunks.get(i);
                 MySqlSnapshotSplit split =
@@ -151,6 +147,41 @@ class ChunkSplitter {
         }
     }
 
+    /**
+     * get the split column using primary key
+     * for those don't have primary key, return the first column
+     * @return chunks
+     */
+    private Column getSplitColumn(Table table) {
+        if (table.primaryKeyColumns().isEmpty()) {
+            // since we do not need a split column when there is no primary key
+            // simply return the first column which won't be used
+            return table.columns().get(0);
+        } else {
+            return ChunkUtils.getSplitColumn(table);
+        }
+    }
+
+    /**
+     * get chunks of the table using primary key
+     * for those who don't have primary key, return the whole table as a chunk
+     * @return chunks
+     */
+    private List<ChunkRange> getChunks(TableId tableId, JdbcConnection jdbc, 
Table table) {
+        if (table.primaryKeyColumns().isEmpty()) {
+            // take the whole table as chunk range
+            // when there is no primary key presented
+            return Collections.singletonList(ChunkRange.all());
+        } else {
+            Column splitColumn = ChunkUtils.getSplitColumn(table);
+            try {
+                return splitTableIntoChunks(jdbc, tableId, splitColumn);
+            } catch (SQLException e) {
+                throw new FlinkRuntimeException("Failed to split chunks for 
table " + tableId, e);
+            }
+        }
+    }
+
     /**
      * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
      * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
index 1e3130004..f756550fb 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java
@@ -91,6 +91,14 @@ public class MySqlSnapshotSplit extends MySqlSplit {
         return highWatermark != null;
     }
 
+    /**
+     * read the whole table when split start and split end are null
+     * @return whether the split reads the whole table
+     */
+    public boolean isWholeSplit() {
+        return splitStart == null && splitEnd == null;
+    }
+
     @Override
     public Map<TableId, TableChange> getTableSchemas() {
         return tableSchemas;
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index 01200c958..682857d25 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -154,7 +154,6 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
                 : config.get(ROW_KINDS_FILTERED);
         boolean enableParallelRead = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
         if (enableParallelRead) {
-            validatePrimaryKeyIfEnableParallel(physicalSchema);
             validateStartupOptionIfEnableParallel(startupOptions);
             validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
             validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 
1);
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index fc779f701..a52730578 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -62,7 +62,7 @@ public class AllMigrateTest {
         return new MySqlExtractNode("1", "mysql_input", fields,
                 null, option, null,
                 tables, "localhost", "root", "inlong",
-                "test", null, null, false, null,
+                "test", null, null, true, null,
                 ExtractMode.CDC, null, null);
     }
 

Reply via email to