loserwang1024 commented on code in PR #3964:
URL: https://github.com/apache/flink-cdc/pull/3964#discussion_r2059608012
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java:
##########
@@ -157,46 +157,41 @@ private MySqlRecords pollSplitRecords() throws
InterruptedException {
currentReader = getSnapshotSplitReader();
currentReader.submitSplit(nextSplit);
}
- return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt);
+ return MySqlRecords.forRecords(BINLOG_SPLIT_ID, dataIt);
} else {
// null will be returned after receiving suspend binlog event
// finish current binlog split reading
closeBinlogReader();
- return finishedSplit();
+ return finishedSplit(true);
}
} else {
throw new IllegalStateException("Unsupported reader type.");
}
}
- private MySqlRecords finishedSplit() {
+ private MySqlRecords finishedSplit(boolean recycle) {
final MySqlRecords finishedRecords =
MySqlRecords.forFinishedSplit(currentSplitId);
+ if (recycle) {
+ closeSnapshotReader();
+ }
currentSplitId = null;
return finishedRecords;
}
private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) {
- if (currentReader instanceof SnapshotSplitReader) {
- final MySqlRecords finishedRecords =
- MySqlRecords.forSnapshotRecords(currentSplitId, dataIt);
- closeSnapshotReader();
- return finishedRecords;
- } else {
- return MySqlRecords.forBinlogRecords(currentSplitId, dataIt);
- }
+ return MySqlRecords.forRecords(currentSplitId, dataIt);
}
/**
* Finishes new added snapshot split, mark the binlog split as finished
too, we will add the
* binlog split back in {@code MySqlSourceReader}.
*/
- private MySqlRecords forNewAddedTableFinishedSplit(
- final String splitId, final Iterator<SourceRecords>
recordsForSplit) {
+ private MySqlRecords forNewAddedTableFinishedSplit(final String splitId) {
final Set<String> finishedSplits = new HashSet<>();
finishedSplits.add(splitId);
finishedSplits.add(BINLOG_SPLIT_ID);
currentSplitId = null;
- return new MySqlRecords(splitId, recordsForSplit, finishedSplits);
+ return new MySqlRecords(null, null, finishedSplits);
Review Comment:
org.apache.flink.cdc.connectors.mysql.source.NewlyAddedTableITCase#testNewlyAddedEmptyTableAndInsertAfterJobStart
will throw exception because
org.apache.flink.cdc.connectors.mysql.source.split.MySqlRecords#nextRecordFromSplit.
```java
Caused by: java.lang.IllegalStateException
at
org.apache.flink.cdc.connectors.mysql.source.split.MySqlRecords.nextRecordFromSplit(MySqlRecords.java:70)
~[classes/:?]
at
org.apache.flink.cdc.connectors.mysql.source.split.MySqlRecords.nextRecordFromSplit(MySqlRecords.java:31)
~[classes/:?]
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:199)
~[flink-connector-base-1.20.1.jar:1.20.1]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]