yuxiqian commented on code in PR #3964: URL: https://github.com/apache/flink-cdc/pull/3964#discussion_r2053282655
########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java: ########## @@ -115,64 +117,91 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException { checkReadException(); if (hasNextElement.get()) { - // eg: - // data input: [low watermark event][snapshot events][high watermark event][change - // events][end watermark event] - // data output: [low watermark event][normalized events][high watermark event] - boolean reachChangeLogStart = false; - boolean reachChangeLogEnd = false; - SourceRecord lowWatermark = null; - SourceRecord highWatermark = null; - Map<Struct, SourceRecord> outputBuffer = new HashMap<>(); - while (!reachChangeLogEnd) { - checkReadException(); - List<DataChangeEvent> batch = queue.poll(); - for (DataChangeEvent event : batch) { - SourceRecord record = event.getRecord(); - if (lowWatermark == null) { - lowWatermark = record; - assertLowWatermark(lowWatermark); - continue; - } + if (taskContext.getSourceConfig().isSkipSnapshotBackfill()) { + return pollWithoutBuffer(); + } else { + return pollWithBuffer(); + } + } + // the data has been polled, no more data + reachEnd.compareAndSet(false, true); + return null; + } - if (highWatermark == null && isHighWatermarkEvent(record)) { - highWatermark = record; - // snapshot events capture end and begin to capture stream events - reachChangeLogStart = true; - continue; - } + public Iterator<SourceRecords> pollWithoutBuffer() throws InterruptedException { + List<DataChangeEvent> batch = queue.poll(); + boolean reachChangeLogEnd = + batch.stream().anyMatch(event -> isEndWatermarkEvent(event.getRecord())); Review Comment: Can we simply check if the last element is an EndWatermark event? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org