ruanhang1993 commented on code in PR #3964: URL: https://github.com/apache/flink-cdc/pull/3964#discussion_r2059490853
########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java: ########## @@ -226,24 +228,16 @@ private ChangeEventRecords finishedSplit() { * Finishes new added snapshot split, mark the stream split as finished too, we will add the * stream split back in {@code MySqlSourceReader}. */ - private ChangeEventRecords forNewAddedTableFinishedSplit( - final String splitId, final Iterator<SourceRecords> recordsForSplit) { + private ChangeEventRecords forNewAddedTableFinishedSplit(final String splitId) { final Set<String> finishedSplits = new HashSet<>(); finishedSplits.add(splitId); finishedSplits.add(STREAM_SPLIT_ID); currentSplitId = null; - return new ChangeEventRecords(splitId, recordsForSplit, finishedSplits); + return new ChangeEventRecords(null, null, finishedSplits); } private ChangeEventRecords forRecords(Iterator<SourceRecords> dataIt) { Review Comment: ```suggestion private ChangeEventRecords forUnfinishedRecords(Iterator<SourceRecords> dataIt) { ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java: ########## @@ -115,64 +116,86 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException { checkReadException(); if (hasNextElement.get()) { - // eg: - // data input: [low watermark event][snapshot events][high watermark event][change - // events][end watermark event] - // data output: [low watermark event][normalized events][high watermark event] - boolean reachChangeLogStart = false; - boolean reachChangeLogEnd = false; - SourceRecord lowWatermark = null; - SourceRecord highWatermark = null; - Map<Struct, SourceRecord> outputBuffer = new HashMap<>(); - while (!reachChangeLogEnd) { - checkReadException(); - List<DataChangeEvent> batch = queue.poll(); - for (DataChangeEvent event : batch) { - SourceRecord record = event.getRecord(); - if (lowWatermark == null) { - lowWatermark = record; - assertLowWatermark(lowWatermark); - continue; - } + if (taskContext.getSourceConfig().isSkipSnapshotBackfill()) { + return pollWithoutBuffer(); + } else { + return pollWithBuffer(); + } + } + // the data has been polled, no more data + reachEnd.compareAndSet(false, true); + return null; + } - if (highWatermark == null && isHighWatermarkEvent(record)) { - highWatermark = record; - // snapshot events capture end and begin to capture stream events - reachChangeLogStart = true; - continue; - } + public Iterator<SourceRecords> pollWithoutBuffer() throws InterruptedException { + List<DataChangeEvent> batch = queue.poll(); Review Comment: `checkReadException();` is lost here. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java: ########## @@ -157,46 +157,41 @@ private MySqlRecords pollSplitRecords() throws InterruptedException { currentReader = getSnapshotSplitReader(); currentReader.submitSplit(nextSplit); } - return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt); + return MySqlRecords.forRecords(BINLOG_SPLIT_ID, dataIt); } else { // null will be returned after receiving suspend binlog event // finish current binlog split reading closeBinlogReader(); - return finishedSplit(); + return finishedSplit(true); } } else { throw new IllegalStateException("Unsupported reader type."); } } - private MySqlRecords finishedSplit() { + private MySqlRecords finishedSplit(boolean recycle) { Review Comment: ```suggestion private MySqlRecords finishedSplit(boolean recycleSnapshotReader) { ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlRecords.java: ########## @@ -76,16 +76,11 @@ public Set<String> finishedSplits() { return finishedSnapshotSplits; } - public static MySqlRecords forBinlogRecords( + public static MySqlRecords forRecords( Review Comment: ```suggestion public static MySqlRecords forUnfinishedRecords( ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java: ########## @@ -215,9 +214,12 @@ public boolean canAssignNextSplit() { return currentFetcher == null || currentFetcher.isFinished(); } - private ChangeEventRecords finishedSplit() { + private ChangeEventRecords finishedSplit(boolean recycle) { Review Comment: ```suggestion private ChangeEventRecords finishedSplit(boolean recycleScanFetcher) { ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java: ########## @@ -226,24 +228,16 @@ private ChangeEventRecords finishedSplit() { * Finishes new added snapshot split, mark the stream split as finished too, we will add the * stream split back in {@code MySqlSourceReader}. */ - private ChangeEventRecords forNewAddedTableFinishedSplit( - final String splitId, final Iterator<SourceRecords> recordsForSplit) { + private ChangeEventRecords forNewAddedTableFinishedSplit(final String splitId) { final Set<String> finishedSplits = new HashSet<>(); finishedSplits.add(splitId); finishedSplits.add(STREAM_SPLIT_ID); currentSplitId = null; - return new ChangeEventRecords(splitId, recordsForSplit, finishedSplits); + return new ChangeEventRecords(null, null, finishedSplits); Review Comment: ```suggestion return new ChangeEventRecords(splitId, null, finishedSplits); ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java: ########## @@ -282,79 +282,102 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException { checkReadException(); if (hasNextElement.get()) { - // data input: [low watermark event][snapshot events][high watermark event][binlog - // events][binlog-end event] - // data output: [low watermark event][normalized events][high watermark event] - boolean reachBinlogStart = false; - boolean reachBinlogEnd = false; - SourceRecord lowWatermark = null; - SourceRecord highWatermark = null; - - Map<Struct, List<SourceRecord>> snapshotRecords = new HashMap<>(); - while (!reachBinlogEnd) { - checkReadException(); - List<DataChangeEvent> batch = queue.poll(); - for (DataChangeEvent event : batch) { - SourceRecord record = event.getRecord(); - if (lowWatermark == null) { - lowWatermark = record; - assertLowWatermark(lowWatermark); - continue; - } + if (statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill()) { + return pollWithoutBuffer(); + } else { + return pollWithBuffer(); + } + } - if (highWatermark == null && RecordUtils.isHighWatermarkEvent(record)) { - highWatermark = record; - // snapshot events capture end and begin to capture binlog events - reachBinlogStart = true; - continue; - } + // the data has been polled, no more data + reachEnd.compareAndSet(false, true); + return null; + } - if (reachBinlogStart && RecordUtils.isEndWatermarkEvent(record)) { - // capture to end watermark events, stop the loop - reachBinlogEnd = true; - break; - } + public Iterator<SourceRecords> pollWithoutBuffer() throws InterruptedException { Review Comment: checkReadException(); ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java: ########## @@ -157,46 +157,41 @@ private MySqlRecords pollSplitRecords() throws InterruptedException { currentReader = getSnapshotSplitReader(); currentReader.submitSplit(nextSplit); } - return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt); + return MySqlRecords.forRecords(BINLOG_SPLIT_ID, dataIt); } else { // null will be returned after receiving suspend binlog event // finish current binlog split reading closeBinlogReader(); - return finishedSplit(); + return finishedSplit(true); } } else { throw new IllegalStateException("Unsupported reader type."); } } - private MySqlRecords finishedSplit() { + private MySqlRecords finishedSplit(boolean recycle) { final MySqlRecords finishedRecords = MySqlRecords.forFinishedSplit(currentSplitId); + if (recycle) { + closeSnapshotReader(); + } currentSplitId = null; return finishedRecords; } private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) { Review Comment: ```suggestion private MySqlRecords forUnfinishedRecords(Iterator<SourceRecords> dataIt) { ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java: ########## @@ -157,46 +157,41 @@ private MySqlRecords pollSplitRecords() throws InterruptedException { currentReader = getSnapshotSplitReader(); currentReader.submitSplit(nextSplit); } - return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt); + return MySqlRecords.forRecords(BINLOG_SPLIT_ID, dataIt); } else { // null will be returned after receiving suspend binlog event // finish current binlog split reading closeBinlogReader(); - return finishedSplit(); + return finishedSplit(true); } } else { throw new IllegalStateException("Unsupported reader type."); } } - private MySqlRecords finishedSplit() { + private MySqlRecords finishedSplit(boolean recycle) { final MySqlRecords finishedRecords = MySqlRecords.forFinishedSplit(currentSplitId); + if (recycle) { + closeSnapshotReader(); + } currentSplitId = null; return finishedRecords; } private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) { - if (currentReader instanceof SnapshotSplitReader) { - final MySqlRecords finishedRecords = - MySqlRecords.forSnapshotRecords(currentSplitId, dataIt); - closeSnapshotReader(); - return finishedRecords; - } else { - return MySqlRecords.forBinlogRecords(currentSplitId, dataIt); - } + return MySqlRecords.forRecords(currentSplitId, dataIt); } /** * Finishes new added snapshot split, mark the binlog split as finished too, we will add the * binlog split back in {@code MySqlSourceReader}. */ - private MySqlRecords forNewAddedTableFinishedSplit( - final String splitId, final Iterator<SourceRecords> recordsForSplit) { + private MySqlRecords forNewAddedTableFinishedSplit(final String splitId) { final Set<String> finishedSplits = new HashSet<>(); finishedSplits.add(splitId); finishedSplits.add(BINLOG_SPLIT_ID); currentSplitId = null; - return new MySqlRecords(splitId, recordsForSplit, finishedSplits); + return new MySqlRecords(null, null, finishedSplits); Review Comment: ```suggestion return new MySqlRecords(splitId, null, finishedSplits); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org