ruanhang1993 commented on code in PR #3964:
URL: https://github.com/apache/flink-cdc/pull/3964#discussion_r2059490853


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java:
##########
@@ -226,24 +228,16 @@ private ChangeEventRecords finishedSplit() {
      * Finishes new added snapshot split, mark the stream split as finished 
too, we will add the
      * stream split back in {@code MySqlSourceReader}.
      */
-    private ChangeEventRecords forNewAddedTableFinishedSplit(
-            final String splitId, final Iterator<SourceRecords> 
recordsForSplit) {
+    private ChangeEventRecords forNewAddedTableFinishedSplit(final String 
splitId) {
         final Set<String> finishedSplits = new HashSet<>();
         finishedSplits.add(splitId);
         finishedSplits.add(STREAM_SPLIT_ID);
         currentSplitId = null;
-        return new ChangeEventRecords(splitId, recordsForSplit, 
finishedSplits);
+        return new ChangeEventRecords(null, null, finishedSplits);
     }
 
     private ChangeEventRecords forRecords(Iterator<SourceRecords> dataIt) {

Review Comment:
   ```suggestion
       private ChangeEventRecords forUnfinishedRecords(Iterator<SourceRecords> 
dataIt) {
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java:
##########
@@ -115,64 +116,86 @@ public Iterator<SourceRecords> pollSplitRecords() throws 
InterruptedException {
         checkReadException();
 
         if (hasNextElement.get()) {
-            // eg:
-            // data input: [low watermark event][snapshot events][high 
watermark event][change
-            // events][end watermark event]
-            // data output: [low watermark event][normalized events][high 
watermark event]
-            boolean reachChangeLogStart = false;
-            boolean reachChangeLogEnd = false;
-            SourceRecord lowWatermark = null;
-            SourceRecord highWatermark = null;
-            Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
-            while (!reachChangeLogEnd) {
-                checkReadException();
-                List<DataChangeEvent> batch = queue.poll();
-                for (DataChangeEvent event : batch) {
-                    SourceRecord record = event.getRecord();
-                    if (lowWatermark == null) {
-                        lowWatermark = record;
-                        assertLowWatermark(lowWatermark);
-                        continue;
-                    }
+            if (taskContext.getSourceConfig().isSkipSnapshotBackfill()) {
+                return pollWithoutBuffer();
+            } else {
+                return pollWithBuffer();
+            }
+        }
+        // the data has been polled, no more data
+        reachEnd.compareAndSet(false, true);
+        return null;
+    }
 
-                    if (highWatermark == null && isHighWatermarkEvent(record)) 
{
-                        highWatermark = record;
-                        // snapshot events capture end and begin to capture 
stream events
-                        reachChangeLogStart = true;
-                        continue;
-                    }
+    public Iterator<SourceRecords> pollWithoutBuffer() throws 
InterruptedException {
+        List<DataChangeEvent> batch = queue.poll();

Review Comment:
   `checkReadException();` is lost here.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java:
##########
@@ -157,46 +157,41 @@ private MySqlRecords pollSplitRecords() throws 
InterruptedException {
                     currentReader = getSnapshotSplitReader();
                     currentReader.submitSplit(nextSplit);
                 }
-                return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt);
+                return MySqlRecords.forRecords(BINLOG_SPLIT_ID, dataIt);
             } else {
                 // null will be returned after receiving suspend binlog event
                 // finish current binlog split reading
                 closeBinlogReader();
-                return finishedSplit();
+                return finishedSplit(true);
             }
         } else {
             throw new IllegalStateException("Unsupported reader type.");
         }
     }
 
-    private MySqlRecords finishedSplit() {
+    private MySqlRecords finishedSplit(boolean recycle) {

Review Comment:
   ```suggestion
       private MySqlRecords finishedSplit(boolean recycleSnapshotReader) {
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlRecords.java:
##########
@@ -76,16 +76,11 @@ public Set<String> finishedSplits() {
         return finishedSnapshotSplits;
     }
 
-    public static MySqlRecords forBinlogRecords(
+    public static MySqlRecords forRecords(

Review Comment:
   ```suggestion
       public static MySqlRecords forUnfinishedRecords(
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java:
##########
@@ -215,9 +214,12 @@ public boolean canAssignNextSplit() {
         return currentFetcher == null || currentFetcher.isFinished();
     }
 
-    private ChangeEventRecords finishedSplit() {
+    private ChangeEventRecords finishedSplit(boolean recycle) {

Review Comment:
   ```suggestion
       private ChangeEventRecords finishedSplit(boolean recycleScanFetcher) {
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java:
##########
@@ -226,24 +228,16 @@ private ChangeEventRecords finishedSplit() {
      * Finishes new added snapshot split, mark the stream split as finished 
too, we will add the
      * stream split back in {@code MySqlSourceReader}.
      */
-    private ChangeEventRecords forNewAddedTableFinishedSplit(
-            final String splitId, final Iterator<SourceRecords> 
recordsForSplit) {
+    private ChangeEventRecords forNewAddedTableFinishedSplit(final String 
splitId) {
         final Set<String> finishedSplits = new HashSet<>();
         finishedSplits.add(splitId);
         finishedSplits.add(STREAM_SPLIT_ID);
         currentSplitId = null;
-        return new ChangeEventRecords(splitId, recordsForSplit, 
finishedSplits);
+        return new ChangeEventRecords(null, null, finishedSplits);

Review Comment:
   ```suggestion
           return new ChangeEventRecords(splitId, null, finishedSplits);
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java:
##########
@@ -282,79 +282,102 @@ public Iterator<SourceRecords> pollSplitRecords() throws 
InterruptedException {
         checkReadException();
 
         if (hasNextElement.get()) {
-            // data input: [low watermark event][snapshot events][high 
watermark event][binlog
-            // events][binlog-end event]
-            // data output: [low watermark event][normalized events][high 
watermark event]
-            boolean reachBinlogStart = false;
-            boolean reachBinlogEnd = false;
-            SourceRecord lowWatermark = null;
-            SourceRecord highWatermark = null;
-
-            Map<Struct, List<SourceRecord>> snapshotRecords = new HashMap<>();
-            while (!reachBinlogEnd) {
-                checkReadException();
-                List<DataChangeEvent> batch = queue.poll();
-                for (DataChangeEvent event : batch) {
-                    SourceRecord record = event.getRecord();
-                    if (lowWatermark == null) {
-                        lowWatermark = record;
-                        assertLowWatermark(lowWatermark);
-                        continue;
-                    }
+            if 
(statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill()) {
+                return pollWithoutBuffer();
+            } else {
+                return pollWithBuffer();
+            }
+        }
 
-                    if (highWatermark == null && 
RecordUtils.isHighWatermarkEvent(record)) {
-                        highWatermark = record;
-                        // snapshot events capture end and begin to capture 
binlog events
-                        reachBinlogStart = true;
-                        continue;
-                    }
+        // the data has been polled, no more data
+        reachEnd.compareAndSet(false, true);
+        return null;
+    }
 
-                    if (reachBinlogStart && 
RecordUtils.isEndWatermarkEvent(record)) {
-                        // capture to end watermark events, stop the loop
-                        reachBinlogEnd = true;
-                        break;
-                    }
+    public Iterator<SourceRecords> pollWithoutBuffer() throws 
InterruptedException {

Review Comment:
   checkReadException();



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java:
##########
@@ -157,46 +157,41 @@ private MySqlRecords pollSplitRecords() throws 
InterruptedException {
                     currentReader = getSnapshotSplitReader();
                     currentReader.submitSplit(nextSplit);
                 }
-                return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt);
+                return MySqlRecords.forRecords(BINLOG_SPLIT_ID, dataIt);
             } else {
                 // null will be returned after receiving suspend binlog event
                 // finish current binlog split reading
                 closeBinlogReader();
-                return finishedSplit();
+                return finishedSplit(true);
             }
         } else {
             throw new IllegalStateException("Unsupported reader type.");
         }
     }
 
-    private MySqlRecords finishedSplit() {
+    private MySqlRecords finishedSplit(boolean recycle) {
         final MySqlRecords finishedRecords = 
MySqlRecords.forFinishedSplit(currentSplitId);
+        if (recycle) {
+            closeSnapshotReader();
+        }
         currentSplitId = null;
         return finishedRecords;
     }
 
     private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) {

Review Comment:
   ```suggestion
       private MySqlRecords forUnfinishedRecords(Iterator<SourceRecords> 
dataIt) {
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java:
##########
@@ -157,46 +157,41 @@ private MySqlRecords pollSplitRecords() throws 
InterruptedException {
                     currentReader = getSnapshotSplitReader();
                     currentReader.submitSplit(nextSplit);
                 }
-                return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt);
+                return MySqlRecords.forRecords(BINLOG_SPLIT_ID, dataIt);
             } else {
                 // null will be returned after receiving suspend binlog event
                 // finish current binlog split reading
                 closeBinlogReader();
-                return finishedSplit();
+                return finishedSplit(true);
             }
         } else {
             throw new IllegalStateException("Unsupported reader type.");
         }
     }
 
-    private MySqlRecords finishedSplit() {
+    private MySqlRecords finishedSplit(boolean recycle) {
         final MySqlRecords finishedRecords = 
MySqlRecords.forFinishedSplit(currentSplitId);
+        if (recycle) {
+            closeSnapshotReader();
+        }
         currentSplitId = null;
         return finishedRecords;
     }
 
     private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) {
-        if (currentReader instanceof SnapshotSplitReader) {
-            final MySqlRecords finishedRecords =
-                    MySqlRecords.forSnapshotRecords(currentSplitId, dataIt);
-            closeSnapshotReader();
-            return finishedRecords;
-        } else {
-            return MySqlRecords.forBinlogRecords(currentSplitId, dataIt);
-        }
+        return MySqlRecords.forRecords(currentSplitId, dataIt);
     }
 
     /**
      * Finishes new added snapshot split, mark the binlog split as finished 
too, we will add the
      * binlog split back in {@code MySqlSourceReader}.
      */
-    private MySqlRecords forNewAddedTableFinishedSplit(
-            final String splitId, final Iterator<SourceRecords> 
recordsForSplit) {
+    private MySqlRecords forNewAddedTableFinishedSplit(final String splitId) {
         final Set<String> finishedSplits = new HashSet<>();
         finishedSplits.add(splitId);
         finishedSplits.add(BINLOG_SPLIT_ID);
         currentSplitId = null;
-        return new MySqlRecords(splitId, recordsForSplit, finishedSplits);
+        return new MySqlRecords(null, null, finishedSplits);

Review Comment:
   ```suggestion
           return new MySqlRecords(splitId, null, finishedSplits);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to