ruanhang1993 commented on code in PR #3964: URL: https://github.com/apache/flink-cdc/pull/3964#discussion_r2059816005
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java: ########## @@ -157,46 +157,41 @@ private MySqlRecords pollSplitRecords() throws InterruptedException { currentReader = getSnapshotSplitReader(); currentReader.submitSplit(nextSplit); } - return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt); + return MySqlRecords.forRecords(BINLOG_SPLIT_ID, dataIt); } else { // null will be returned after receiving suspend binlog event // finish current binlog split reading closeBinlogReader(); - return finishedSplit(); + return finishedSplit(true); } } else { throw new IllegalStateException("Unsupported reader type."); } } - private MySqlRecords finishedSplit() { + private MySqlRecords finishedSplit(boolean recycle) { final MySqlRecords finishedRecords = MySqlRecords.forFinishedSplit(currentSplitId); + if (recycle) { + closeSnapshotReader(); + } currentSplitId = null; return finishedRecords; } private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) { - if (currentReader instanceof SnapshotSplitReader) { - final MySqlRecords finishedRecords = - MySqlRecords.forSnapshotRecords(currentSplitId, dataIt); - closeSnapshotReader(); - return finishedRecords; - } else { - return MySqlRecords.forBinlogRecords(currentSplitId, dataIt); - } + return MySqlRecords.forRecords(currentSplitId, dataIt); } /** * Finishes new added snapshot split, mark the binlog split as finished too, we will add the * binlog split back in {@code MySqlSourceReader}. */ - private MySqlRecords forNewAddedTableFinishedSplit( - final String splitId, final Iterator<SourceRecords> recordsForSplit) { + private MySqlRecords forNewAddedTableFinishedSplit(final String splitId) { final Set<String> finishedSplits = new HashSet<>(); finishedSplits.add(splitId); finishedSplits.add(BINLOG_SPLIT_ID); currentSplitId = null; - return new MySqlRecords(splitId, recordsForSplit, finishedSplits); + return new MySqlRecords(null, null, finishedSplits); Review Comment: @loserwang1024 If so, please ignore this comment. Let's use `null` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org