This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f8d4eac64 [INLONG-7161][Sort] Fix bug that Mysql connector only output 
the latest record in snapshot stage for table without primary key (#7164)
f8d4eac64 is described below

commit f8d4eac641f7fb7ab1a1124b9fd4c7c1344b41fe
Author: Schnapps <zpen...@connect.ust.hk>
AuthorDate: Thu Jan 5 19:19:02 2023 +0800

    [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest 
record in snapshot stage for table without primary key (#7164)
    
    Co-authored-by: stingpeng <stingp...@tencent.com>
---
 .../sort/cdc/mysql/source/utils/RecordUtils.java   | 31 ++++++++++++++++------
 1 file changed, 23 insertions(+), 8 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index ef7ef4ca9..6944bd795 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -88,7 +88,8 @@ public class RecordUtils {
             List<SourceRecord> sourceRecords,
             SchemaNameAdjuster nameAdjuster) {
         List<SourceRecord> normalizedRecords = new ArrayList<>();
-        Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
+        Map<Struct, SourceRecord> snapshotRecordsWithKey = new HashMap<>();
+        List<SourceRecord> snapshotRecordsWithoutKey = new ArrayList<>();
         List<SourceRecord> binlogRecords = new ArrayList<>();
         if (!sourceRecords.isEmpty()) {
 
@@ -103,7 +104,11 @@ public class RecordUtils {
             for (; i < sourceRecords.size(); i++) {
                 SourceRecord sourceRecord = sourceRecords.get(i);
                 if (!isHighWatermarkEvent(sourceRecord)) {
-                    snapshotRecords.put((Struct) sourceRecord.key(), 
sourceRecord);
+                    if (sourceRecord.key() == null) {
+                        snapshotRecordsWithoutKey.add(sourceRecord);
+                    } else {
+                        snapshotRecordsWithKey.put((Struct) 
sourceRecord.key(), sourceRecord);
+                    }
                 } else {
                     highWatermark = sourceRecord;
                     i++;
@@ -130,8 +135,11 @@ public class RecordUtils {
                     String.format(
                             "The last record should be high watermark signal 
event, but is %s",
                             highWatermark));
+
             normalizedRecords =
-                    upsertBinlog(lowWatermark, highWatermark, snapshotRecords, 
binlogRecords);
+                    upsertBinlog(lowWatermark, highWatermark, 
snapshotRecordsWithKey,
+                            binlogRecords, snapshotRecordsWithoutKey);
+
         }
         return normalizedRecords;
     }
@@ -139,8 +147,9 @@ public class RecordUtils {
     private static List<SourceRecord> upsertBinlog(
             SourceRecord lowWatermarkEvent,
             SourceRecord highWatermarkEvent,
-            Map<Struct, SourceRecord> snapshotRecords,
-            List<SourceRecord> binlogRecords) {
+            Map<Struct, SourceRecord> snapshotRecordsWithKey,
+            List<SourceRecord> binlogRecords,
+            List<SourceRecord> snapshotRecordsWithoutKey) {
         // upsert binlog events to snapshot events of split
         if (!binlogRecords.isEmpty()) {
             for (SourceRecord binlog : binlogRecords) {
@@ -169,10 +178,10 @@ public class RecordUtils {
                                             binlog.key(),
                                             binlog.valueSchema(),
                                             envelope.read(after, source, 
fetchTs));
-                            snapshotRecords.put(key, record);
+                            snapshotRecordsWithKey.put(key, record);
                             break;
                         case DELETE:
-                            snapshotRecords.remove(key);
+                            snapshotRecordsWithKey.remove(key);
                             break;
                         case READ:
                             throw new IllegalStateException(
@@ -188,7 +197,13 @@ public class RecordUtils {
 
         final List<SourceRecord> normalizedRecords = new ArrayList<>();
         normalizedRecords.add(lowWatermarkEvent);
-        
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
+        if (!snapshotRecordsWithoutKey.isEmpty()) {
+            // for table without key, there is no need for binlog upsert
+            // because highWatermark equals to lowWatermark
+            
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithoutKey));
+        } else {
+            
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithKey.values()));
+        }
         normalizedRecords.add(highWatermarkEvent);
 
         return normalizedRecords;

Reply via email to