This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0c3044e3f6 [Hotfix][CDC] Fix split schema change stream (#7003)
0c3044e3f6 is described below

commit 0c3044e3f6d5d1a52df1896e069676b509419e4b
Author: hailin0 <wanghai...@apache.org>
AuthorDate: Tue Jun 18 14:29:33 2024 +0800

    [Hotfix][CDC] Fix split schema change stream (#7003)
---
 .../external/IncrementalSourceStreamFetcher.java   | 157 ++++++++++++---------
 .../IncrementalSourceStreamFetcherTest.java        |  23 ++-
 2 files changed, 115 insertions(+), 65 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 4cad739ac6..338cb657b3 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -171,69 +171,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
      * checkpoint-after] [a, b, c, d, e]
      */
     Iterator<SourceRecords> splitSchemaChangeStream(List<DataChangeEvent> 
batchEvents) {
-        List<SourceRecords> sourceRecordsSet = new ArrayList<>();
-
-        List<SourceRecord> sourceRecordList = new ArrayList<>();
-        SourceRecord previousRecord = null;
-        for (int i = 0; i < batchEvents.size(); i++) {
-            DataChangeEvent event = batchEvents.get(i);
-            SourceRecord currentRecord = event.getRecord();
-            if (!shouldEmit(currentRecord)) {
-                continue;
-            }
-
-            if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
-                if (!schemaChangeResolver.support(currentRecord)) {
-                    continue;
-                }
-
-                if (previousRecord == null) {
-                    // add schema-change-before to first
-                    sourceRecordList.add(
-                            
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
-                    sourceRecordsSet.add(new SourceRecords(sourceRecordList));
-                    sourceRecordList = new ArrayList<>();
-                    sourceRecordList.add(currentRecord);
-                } else if 
(SourceRecordUtils.isSchemaChangeEvent(previousRecord)) {
-                    sourceRecordList.add(currentRecord);
-                } else {
-                    sourceRecordList.add(
-                            
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
-                    sourceRecordsSet.add(new SourceRecords(sourceRecordList));
-                    sourceRecordList = new ArrayList<>();
-                    sourceRecordList.add(currentRecord);
-                }
-            } else if (SourceRecordUtils.isDataChangeRecord(currentRecord)
-                    || SourceRecordUtils.isHeartbeatRecord(currentRecord)) {
-                if (previousRecord == null
-                        || SourceRecordUtils.isDataChangeRecord(previousRecord)
-                        || 
SourceRecordUtils.isHeartbeatRecord(previousRecord)) {
-                    sourceRecordList.add(currentRecord);
-                } else {
-                    sourceRecordList.add(
-                            
WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord));
-                    sourceRecordsSet.add(new SourceRecords(sourceRecordList));
-                    sourceRecordList = new ArrayList<>();
-                    sourceRecordList.add(currentRecord);
-                }
-            }
-            previousRecord = currentRecord;
-            if (i == batchEvents.size() - 1) {
-                if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
-                    sourceRecordList.add(
-                            
WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord));
-                }
-                sourceRecordsSet.add(new SourceRecords(sourceRecordList));
-            }
-        }
-
-        if (sourceRecordsSet.size() > 1) {
-            log.debug(
-                    "Split events stream into {} batches and mark schema 
checkpoint before/after",
-                    sourceRecordsSet.size());
-        }
-
-        return sourceRecordsSet.iterator();
+        return new SchemaChangeStreamSplitter().split(batchEvents);
     }
 
     private void checkReadException() {
@@ -349,4 +287,97 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
         this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
         this.pureBinlogPhaseTables.clear();
     }
+
+    class SchemaChangeStreamSplitter {
+        private List<SourceRecords> blockSet;
+        private List<SourceRecord> currentBlock;
+        private SourceRecord previousRecord;
+
+        public SchemaChangeStreamSplitter() {
+            blockSet = new ArrayList<>();
+            currentBlock = new ArrayList<>();
+            previousRecord = null;
+        }
+
+        public Iterator<SourceRecords> split(List<DataChangeEvent> 
batchEvents) {
+            for (int i = 0; i < batchEvents.size(); i++) {
+                DataChangeEvent event = batchEvents.get(i);
+                SourceRecord currentRecord = event.getRecord();
+                if (!shouldEmit(currentRecord)) {
+                    continue;
+                }
+
+                if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
+                    if (!schemaChangeResolver.support(currentRecord)) {
+                        continue;
+                    }
+
+                    if (previousRecord == null) {
+                        // add schema-change-before to first
+                        currentBlock.add(
+                                
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
+                        flipBlock();
+
+                        currentBlock.add(currentRecord);
+                    } else if 
(SourceRecordUtils.isSchemaChangeEvent(previousRecord)) {
+                        currentBlock.add(currentRecord);
+                    } else {
+                        currentBlock.add(
+                                
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
+                        flipBlock();
+
+                        currentBlock.add(currentRecord);
+                    }
+                } else if (SourceRecordUtils.isDataChangeRecord(currentRecord)
+                        || SourceRecordUtils.isHeartbeatRecord(currentRecord)) 
{
+                    if (previousRecord == null
+                            || 
SourceRecordUtils.isDataChangeRecord(previousRecord)
+                            || 
SourceRecordUtils.isHeartbeatRecord(previousRecord)) {
+                        currentBlock.add(currentRecord);
+                    } else {
+                        endBlock(previousRecord);
+                        flipBlock();
+
+                        currentBlock.add(currentRecord);
+                    }
+                }
+
+                previousRecord = currentRecord;
+                if (i == batchEvents.size() - 1) {
+                    endBlock(currentRecord);
+                    flipBlock();
+                }
+            }
+
+            endLastBlock(previousRecord);
+
+            if (blockSet.size() > 1) {
+                log.debug(
+                        "Split events stream into {} batches and mark schema 
change checkpoint",
+                        blockSet.size());
+            }
+
+            return blockSet.iterator();
+        }
+
+        void flipBlock() {
+            if (!currentBlock.isEmpty()) {
+                blockSet.add(new SourceRecords(currentBlock));
+                currentBlock = new ArrayList<>();
+            }
+        }
+
+        void endBlock(SourceRecord lastRecord) {
+            if (!currentBlock.isEmpty()) {
+                if (SourceRecordUtils.isSchemaChangeEvent(lastRecord)) {
+                    
currentBlock.add(WatermarkEvent.createSchemaChangeAfterWatermark(lastRecord));
+                }
+            }
+        }
+
+        void endLastBlock(SourceRecord lastRecord) {
+            endBlock(lastRecord);
+            flipBlock();
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
index a17f7f8673..ee8d4d7e5d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.source.SourceRecord;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.stubbing.Answer;
 
 import io.debezium.config.CommonConnectorConfig;
 import io.debezium.config.Configuration;
@@ -61,6 +62,7 @@ public class IncrementalSourceStreamFetcherTest {
                     .with(Heartbeat.HEARTBEAT_INTERVAL, 1)
                     .with(TRANSACTION_TOPIC, "test")
                     .build();
+    private static final String UNKNOWN_SCHEMA_KEY = "UNKNOWN";
 
     @Test
     public void testSplitSchemaChangeStream() throws Exception {
@@ -107,6 +109,7 @@ public class IncrementalSourceStreamFetcherTest {
         inputEvents.add(new DataChangeEvent(createDataEvent()));
         inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
         inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent()));
         outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
         outputEvents.forEachRemaining(records::add);
 
@@ -134,6 +137,7 @@ public class IncrementalSourceStreamFetcherTest {
         inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
         inputEvents.add(new DataChangeEvent(createDataEvent()));
         inputEvents.add(new DataChangeEvent(createDataEvent()));
+        inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent()));
         outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
         outputEvents.forEachRemaining(records::add);
 
@@ -323,13 +327,21 @@ public class IncrementalSourceStreamFetcherTest {
     }
 
     static SourceRecord createSchemaChangeEvent() {
+        return createSchemaChangeEvent("SCHEMA_CHANGE_TOPIC");
+    }
+
+    static SourceRecord createSchemaChangeUnknownEvent() {
+        return createSchemaChangeEvent(UNKNOWN_SCHEMA_KEY);
+    }
+
+    static SourceRecord createSchemaChangeEvent(String topic) {
         Schema keySchema =
                 
SchemaBuilder.struct().name(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME).build();
         SourceRecord record =
                 new SourceRecord(
                         Collections.emptyMap(),
                         Collections.emptyMap(),
-                        null,
+                        topic,
                         keySchema,
                         null,
                         null,
@@ -377,7 +389,14 @@ public class IncrementalSourceStreamFetcherTest {
 
     static IncrementalSourceStreamFetcher createFetcher() {
         SchemaChangeResolver schemaChangeResolver = 
mock(SchemaChangeResolver.class);
-        when(schemaChangeResolver.support(any())).thenReturn(true);
+        when(schemaChangeResolver.support(any()))
+                .thenAnswer(
+                        (Answer<Boolean>)
+                                invocationOnMock -> {
+                                    SourceRecord record = 
invocationOnMock.getArgument(0);
+                                    return record.topic() == null
+                                            || 
!record.topic().equalsIgnoreCase(UNKNOWN_SCHEMA_KEY);
+                                });
         IncrementalSourceStreamFetcher fetcher =
                 new IncrementalSourceStreamFetcher(null, 0, 
schemaChangeResolver);
         IncrementalSourceStreamFetcher spy = spy(fetcher);

Reply via email to