This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit b96d4ed4aa21f917ae2c49833e3a2925c295c3f5 Author: Schnapps <zpen...@connect.ust.hk> AuthorDate: Thu Jan 5 19:19:02 2023 +0800 [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164) Co-authored-by: stingpeng <stingp...@tencent.com> --- .../sort/cdc/mysql/source/utils/RecordUtils.java | 31 ++++++++++++++++------ 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java index ef7ef4ca9..6944bd795 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java @@ -88,7 +88,8 @@ public class RecordUtils { List<SourceRecord> sourceRecords, SchemaNameAdjuster nameAdjuster) { List<SourceRecord> normalizedRecords = new ArrayList<>(); - Map<Struct, SourceRecord> snapshotRecords = new HashMap<>(); + Map<Struct, SourceRecord> snapshotRecordsWithKey = new HashMap<>(); + List<SourceRecord> snapshotRecordsWithoutKey = new ArrayList<>(); List<SourceRecord> binlogRecords = new ArrayList<>(); if (!sourceRecords.isEmpty()) { @@ -103,7 +104,11 @@ public class RecordUtils { for (; i < sourceRecords.size(); i++) { SourceRecord sourceRecord = sourceRecords.get(i); if (!isHighWatermarkEvent(sourceRecord)) { - snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord); + if (sourceRecord.key() == null) { + snapshotRecordsWithoutKey.add(sourceRecord); + } else { + snapshotRecordsWithKey.put((Struct) sourceRecord.key(), sourceRecord); + } } else { highWatermark = sourceRecord; i++; @@ -130,8 +135,11 @@ public class RecordUtils { String.format( "The last record should be high watermark signal event, but is %s", highWatermark)); + normalizedRecords = - upsertBinlog(lowWatermark, highWatermark, snapshotRecords, binlogRecords); + upsertBinlog(lowWatermark, highWatermark, snapshotRecordsWithKey, + binlogRecords, snapshotRecordsWithoutKey); + } return normalizedRecords; } @@ -139,8 +147,9 @@ public class RecordUtils { private static List<SourceRecord> upsertBinlog( SourceRecord lowWatermarkEvent, SourceRecord highWatermarkEvent, - Map<Struct, SourceRecord> snapshotRecords, - List<SourceRecord> binlogRecords) { + Map<Struct, SourceRecord> snapshotRecordsWithKey, + List<SourceRecord> binlogRecords, + List<SourceRecord> snapshotRecordsWithoutKey) { // upsert binlog events to snapshot events of split if (!binlogRecords.isEmpty()) { for (SourceRecord binlog : binlogRecords) { @@ -169,10 +178,10 @@ public class RecordUtils { binlog.key(), binlog.valueSchema(), envelope.read(after, source, fetchTs)); - snapshotRecords.put(key, record); + snapshotRecordsWithKey.put(key, record); break; case DELETE: - snapshotRecords.remove(key); + snapshotRecordsWithKey.remove(key); break; case READ: throw new IllegalStateException( @@ -188,7 +197,13 @@ public class RecordUtils { final List<SourceRecord> normalizedRecords = new ArrayList<>(); normalizedRecords.add(lowWatermarkEvent); - normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values())); + if (!snapshotRecordsWithoutKey.isEmpty()) { + // for table without key, there is no need for binlog upsert + // because highWatermark equals to lowWatermark + normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithoutKey)); + } else { + normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithKey.values())); + } normalizedRecords.add(highWatermarkEvent); return normalizedRecords;