This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new cc6fcb652 [INLONG-6842][Sort] Improve mysql-cdc2.0 to support tables without primary key (#6859) cc6fcb652 is described below commit cc6fcb6528aedb5be3f0f4d7d3d18d1e9f62b095 Author: Schnapps <zpen...@connect.ust.hk> AuthorDate: Wed Dec 14 10:32:37 2022 +0800 [INLONG-6842][Sort] Improve mysql-cdc2.0 to support tables without primary key (#6859) Co-authored-by: stingpeng <stingp...@tencent.com> --- .../debezium/task/MySqlSnapshotSplitReadTask.java | 18 ++++++++- .../cdc/mysql/source/assigners/ChunkSplitter.java | 47 ++++++++++++++++++---- .../cdc/mysql/source/split/MySqlSnapshotSplit.java | 8 ++++ .../mysql/table/MySqlTableInlongSourceFactory.java | 1 - .../apache/inlong/sort/parser/AllMigrateTest.java | 2 +- 5 files changed, 65 insertions(+), 11 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index 28fb660d9..6a37a0f18 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -149,7 +149,8 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc LOG.info("Snapshot step 2 - Snapshotting data"); createDataEvents(ctx, snapshotSplit.getTableId()); - final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection); + BinlogOffset highWatermark = determineHighWatermark(lowWatermark); + LOG.info( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, @@ -162,6 +163,21 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc return SnapshotResult.completed(ctx.offset); } + /** + * for chunk that equals to the whole table we do not need to normalize + * the snapshot data and the binlog data, just set high watermark to low watermark + * @return highWatermark + */ + private BinlogOffset determineHighWatermark(BinlogOffset lowWatermark) { + if (snapshotSplit.isWholeSplit()) { + LOG.info("for split {}, set highWatermark to lowWatermark {} since" + + " it reads the whole table ", snapshotSplit, lowWatermark); + return lowWatermark; + } else { + return currentBinlogOffset(jdbcConnection); + } + } + @Override protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) { return new SnapshottingTask(false, true); diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java index e4a6bd3e4..26494f3cf 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java @@ -114,17 +114,13 @@ class ChunkSplitter { long start = System.currentTimeMillis(); Table table = mySqlSchema.getTableSchema(jdbc, tableId).getTable(); - Column splitColumn = ChunkUtils.getSplitColumn(table); - final List<ChunkRange> chunks; - try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); - } catch (SQLException e) { - throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); - } + + List<ChunkRange> chunks = getChunks(tableId, jdbc, table); // convert chunks into splits List<MySqlSnapshotSplit> splits = new ArrayList<>(); - RowType splitType = ChunkUtils.getSplitType(splitColumn); + + RowType splitType = ChunkUtils.getSplitType(getSplitColumn(table)); for (int i = 0; i < chunks.size(); i++) { ChunkRange chunk = chunks.get(i); MySqlSnapshotSplit split = @@ -151,6 +147,41 @@ class ChunkSplitter { } } + /** + * get the split column using primary key + * for those don't have primary key, return the first column + * @return chunks + */ + private Column getSplitColumn(Table table) { + if (table.primaryKeyColumns().isEmpty()) { + // since we do not need a split column when there is no primary key + // simply return the first column which won't be used + return table.columns().get(0); + } else { + return ChunkUtils.getSplitColumn(table); + } + } + + /** + * get chunks of the table using primary key + * for those who don't have primary key, return the whole table as a chunk + * @return chunks + */ + private List<ChunkRange> getChunks(TableId tableId, JdbcConnection jdbc, Table table) { + if (table.primaryKeyColumns().isEmpty()) { + // take the whole table as chunk range + // when there is no primary key presented + return Collections.singletonList(ChunkRange.all()); + } else { + Column splitColumn = ChunkUtils.getSplitColumn(table); + try { + return splitTableIntoChunks(jdbc, tableId, splitColumn); + } catch (SQLException e) { + throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); + } + } + } + /** * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java index 1e3130004..f756550fb 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/split/MySqlSnapshotSplit.java @@ -91,6 +91,14 @@ public class MySqlSnapshotSplit extends MySqlSplit { return highWatermark != null; } + /** + * read the whole table when split start and split end are null + * @return whether the split reads the whole table + */ + public boolean isWholeSplit() { + return splitStart == null && splitEnd == null; + } + @Override public Map<TableId, TableChange> getTableSchemas() { return tableSchemas; diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java index 01200c958..682857d25 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java @@ -154,7 +154,6 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory : config.get(ROW_KINDS_FILTERED); boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); if (enableParallelRead) { - validatePrimaryKeyIfEnableParallel(physicalSchema); validateStartupOptionIfEnableParallel(startupOptions); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java index fc779f701..a52730578 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java @@ -62,7 +62,7 @@ public class AllMigrateTest { return new MySqlExtractNode("1", "mysql_input", fields, null, option, null, tables, "localhost", "root", "inlong", - "test", null, null, false, null, + "test", null, null, true, null, ExtractMode.CDC, null, null); }