loserwang1024 commented on code in PR #3964: URL: https://github.com/apache/flink-cdc/pull/3964#discussion_r2059608012
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java: ########## @@ -157,46 +157,41 @@ private MySqlRecords pollSplitRecords() throws InterruptedException { currentReader = getSnapshotSplitReader(); currentReader.submitSplit(nextSplit); } - return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt); + return MySqlRecords.forRecords(BINLOG_SPLIT_ID, dataIt); } else { // null will be returned after receiving suspend binlog event // finish current binlog split reading closeBinlogReader(); - return finishedSplit(); + return finishedSplit(true); } } else { throw new IllegalStateException("Unsupported reader type."); } } - private MySqlRecords finishedSplit() { + private MySqlRecords finishedSplit(boolean recycle) { final MySqlRecords finishedRecords = MySqlRecords.forFinishedSplit(currentSplitId); + if (recycle) { + closeSnapshotReader(); + } currentSplitId = null; return finishedRecords; } private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) { - if (currentReader instanceof SnapshotSplitReader) { - final MySqlRecords finishedRecords = - MySqlRecords.forSnapshotRecords(currentSplitId, dataIt); - closeSnapshotReader(); - return finishedRecords; - } else { - return MySqlRecords.forBinlogRecords(currentSplitId, dataIt); - } + return MySqlRecords.forRecords(currentSplitId, dataIt); } /** * Finishes new added snapshot split, mark the binlog split as finished too, we will add the * binlog split back in {@code MySqlSourceReader}. */ - private MySqlRecords forNewAddedTableFinishedSplit( - final String splitId, final Iterator<SourceRecords> recordsForSplit) { + private MySqlRecords forNewAddedTableFinishedSplit(final String splitId) { final Set<String> finishedSplits = new HashSet<>(); finishedSplits.add(splitId); finishedSplits.add(BINLOG_SPLIT_ID); currentSplitId = null; - return new MySqlRecords(splitId, recordsForSplit, finishedSplits); + return new MySqlRecords(null, null, finishedSplits); Review Comment: org.apache.flink.cdc.connectors.mysql.source.NewlyAddedTableITCase#testNewlyAddedEmptyTableAndInsertAfterJobStart will throw exception because org.apache.flink.cdc.connectors.mysql.source.split.MySqlRecords#nextRecordFromSplit. If we set spilt id is not null, recordsForSplit cannot be null. ```java Caused by: java.lang.IllegalStateException at org.apache.flink.cdc.connectors.mysql.source.split.MySqlRecords.nextRecordFromSplit(MySqlRecords.java:70) ~[classes/:?] at org.apache.flink.cdc.connectors.mysql.source.split.MySqlRecords.nextRecordFromSplit(MySqlRecords.java:31) ~[classes/:?] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:199) ~[flink-connector-base-1.20.1.jar:1.20.1] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org