loserwang1024 commented on code in PR #3964:
URL: https://github.com/apache/flink-cdc/pull/3964#discussion_r2053383609


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java:
##########
@@ -115,64 +117,91 @@ public Iterator<SourceRecords> pollSplitRecords() throws 
InterruptedException {
         checkReadException();
 
         if (hasNextElement.get()) {
-            // eg:
-            // data input: [low watermark event][snapshot events][high 
watermark event][change
-            // events][end watermark event]
-            // data output: [low watermark event][normalized events][high 
watermark event]
-            boolean reachChangeLogStart = false;
-            boolean reachChangeLogEnd = false;
-            SourceRecord lowWatermark = null;
-            SourceRecord highWatermark = null;
-            Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
-            while (!reachChangeLogEnd) {
-                checkReadException();
-                List<DataChangeEvent> batch = queue.poll();
-                for (DataChangeEvent event : batch) {
-                    SourceRecord record = event.getRecord();
-                    if (lowWatermark == null) {
-                        lowWatermark = record;
-                        assertLowWatermark(lowWatermark);
-                        continue;
-                    }
+            if (taskContext.getSourceConfig().isSkipSnapshotBackfill()) {
+                return pollWithoutBuffer();
+            } else {
+                return pollWithBuffer();
+            }
+        }
+        // the data has been polled, no more data
+        reachEnd.compareAndSet(false, true);
+        return null;
+    }
 
-                    if (highWatermark == null && isHighWatermarkEvent(record)) 
{
-                        highWatermark = record;
-                        // snapshot events capture end and begin to capture 
stream events
-                        reachChangeLogStart = true;
-                        continue;
-                    }
+    public Iterator<SourceRecords> pollWithoutBuffer() throws 
InterruptedException {
+        List<DataChangeEvent> batch = queue.poll();
+        boolean reachChangeLogEnd =
+                batch.stream().anyMatch(event -> 
isEndWatermarkEvent(event.getRecord()));

Review Comment:
   @yuxiqian , Thanks for your code review. I used to consider it but not do it 
because :
   1. I'am afraid some other event(such as heart beat or some unknown event) is 
inserted after EndWatermark event, then the scan fetcher will never step. 
Currently, I doesn't lookup all the debezium task to see all the event. In 
older pollWithBuffer logical, still will iterator all the record to determine 
whether is low watermark, high watermark and end watermark. Rather than get the 
first one of batch to determine low_watermark, and get the last one of batch to 
determine end_watermark.
   2. this improve not so much. The cost is still O(N) rather than O(1) because 
the following code. 
   ```java
   final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
           if (!CollectionUtil.isNullOrEmpty(batch)) {
               sourceRecordsSet.add(
                       new SourceRecords(
                               batch.stream()
                                       .map(DataChangeEvent::getRecord)
                                       .collect(Collectors.toList())));
           }
   ```
   
   This pr will change all the framework, thus all I do is the smallest changes 
for safe. Maybe we can improve it in another issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to