yuxiqian commented on code in PR #3964:
URL: https://github.com/apache/flink-cdc/pull/3964#discussion_r2053282655


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java:
##########
@@ -115,64 +117,91 @@ public Iterator<SourceRecords> pollSplitRecords() throws 
InterruptedException {
         checkReadException();
 
         if (hasNextElement.get()) {
-            // eg:
-            // data input: [low watermark event][snapshot events][high 
watermark event][change
-            // events][end watermark event]
-            // data output: [low watermark event][normalized events][high 
watermark event]
-            boolean reachChangeLogStart = false;
-            boolean reachChangeLogEnd = false;
-            SourceRecord lowWatermark = null;
-            SourceRecord highWatermark = null;
-            Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
-            while (!reachChangeLogEnd) {
-                checkReadException();
-                List<DataChangeEvent> batch = queue.poll();
-                for (DataChangeEvent event : batch) {
-                    SourceRecord record = event.getRecord();
-                    if (lowWatermark == null) {
-                        lowWatermark = record;
-                        assertLowWatermark(lowWatermark);
-                        continue;
-                    }
+            if (taskContext.getSourceConfig().isSkipSnapshotBackfill()) {
+                return pollWithoutBuffer();
+            } else {
+                return pollWithBuffer();
+            }
+        }
+        // the data has been polled, no more data
+        reachEnd.compareAndSet(false, true);
+        return null;
+    }
 
-                    if (highWatermark == null && isHighWatermarkEvent(record)) 
{
-                        highWatermark = record;
-                        // snapshot events capture end and begin to capture 
stream events
-                        reachChangeLogStart = true;
-                        continue;
-                    }
+    public Iterator<SourceRecords> pollWithoutBuffer() throws 
InterruptedException {
+        List<DataChangeEvent> batch = queue.poll();
+        boolean reachChangeLogEnd =
+                batch.stream().anyMatch(event -> 
isEndWatermarkEvent(event.getRecord()));

Review Comment:
   Can we simply check if the last element is an EndWatermark event?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to