loserwang1024 commented on code in PR #3964: URL: https://github.com/apache/flink-cdc/pull/3964#discussion_r2053383609
########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java: ########## @@ -115,64 +117,91 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException { checkReadException(); if (hasNextElement.get()) { - // eg: - // data input: [low watermark event][snapshot events][high watermark event][change - // events][end watermark event] - // data output: [low watermark event][normalized events][high watermark event] - boolean reachChangeLogStart = false; - boolean reachChangeLogEnd = false; - SourceRecord lowWatermark = null; - SourceRecord highWatermark = null; - Map<Struct, SourceRecord> outputBuffer = new HashMap<>(); - while (!reachChangeLogEnd) { - checkReadException(); - List<DataChangeEvent> batch = queue.poll(); - for (DataChangeEvent event : batch) { - SourceRecord record = event.getRecord(); - if (lowWatermark == null) { - lowWatermark = record; - assertLowWatermark(lowWatermark); - continue; - } + if (taskContext.getSourceConfig().isSkipSnapshotBackfill()) { + return pollWithoutBuffer(); + } else { + return pollWithBuffer(); + } + } + // the data has been polled, no more data + reachEnd.compareAndSet(false, true); + return null; + } - if (highWatermark == null && isHighWatermarkEvent(record)) { - highWatermark = record; - // snapshot events capture end and begin to capture stream events - reachChangeLogStart = true; - continue; - } + public Iterator<SourceRecords> pollWithoutBuffer() throws InterruptedException { + List<DataChangeEvent> batch = queue.poll(); + boolean reachChangeLogEnd = + batch.stream().anyMatch(event -> isEndWatermarkEvent(event.getRecord())); Review Comment: I used to consider it but not do it because : 1. I'am afraid some other event(such as heart beat or some unknown event) is inserted after EndWatermark event, then the scan fetcher will never step. Currently, I doesn't lookup all the debezium task to see all the event. In older pollWithBuffer logical, still will iterator all the record to determine whether is low watermark, high watermark and end watermark. Rather than get the first one of batch to determine low_watermark, and get the last one of batch to determine end_watermark. 2. this improve not so much. The cost is still O(N) rather than O(1) because the following code. ```java final List<SourceRecords> sourceRecordsSet = new ArrayList<>(); if (!CollectionUtil.isNullOrEmpty(batch)) { sourceRecordsSet.add( new SourceRecords( batch.stream() .map(DataChangeEvent::getRecord) .collect(Collectors.toList()))); } ``` This pr will change all the framework, thus all I do is the smallest changes. Maybe we can improve it in another issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org