loserwang1024 commented on code in PR #3964:
URL: https://github.com/apache/flink-cdc/pull/3964#discussion_r2059608012


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java:
##########
@@ -157,46 +157,41 @@ private MySqlRecords pollSplitRecords() throws 
InterruptedException {
                     currentReader = getSnapshotSplitReader();
                     currentReader.submitSplit(nextSplit);
                 }
-                return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt);
+                return MySqlRecords.forRecords(BINLOG_SPLIT_ID, dataIt);
             } else {
                 // null will be returned after receiving suspend binlog event
                 // finish current binlog split reading
                 closeBinlogReader();
-                return finishedSplit();
+                return finishedSplit(true);
             }
         } else {
             throw new IllegalStateException("Unsupported reader type.");
         }
     }
 
-    private MySqlRecords finishedSplit() {
+    private MySqlRecords finishedSplit(boolean recycle) {
         final MySqlRecords finishedRecords = 
MySqlRecords.forFinishedSplit(currentSplitId);
+        if (recycle) {
+            closeSnapshotReader();
+        }
         currentSplitId = null;
         return finishedRecords;
     }
 
     private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) {
-        if (currentReader instanceof SnapshotSplitReader) {
-            final MySqlRecords finishedRecords =
-                    MySqlRecords.forSnapshotRecords(currentSplitId, dataIt);
-            closeSnapshotReader();
-            return finishedRecords;
-        } else {
-            return MySqlRecords.forBinlogRecords(currentSplitId, dataIt);
-        }
+        return MySqlRecords.forRecords(currentSplitId, dataIt);
     }
 
     /**
      * Finishes new added snapshot split, mark the binlog split as finished 
too, we will add the
      * binlog split back in {@code MySqlSourceReader}.
      */
-    private MySqlRecords forNewAddedTableFinishedSplit(
-            final String splitId, final Iterator<SourceRecords> 
recordsForSplit) {
+    private MySqlRecords forNewAddedTableFinishedSplit(final String splitId) {
         final Set<String> finishedSplits = new HashSet<>();
         finishedSplits.add(splitId);
         finishedSplits.add(BINLOG_SPLIT_ID);
         currentSplitId = null;
-        return new MySqlRecords(splitId, recordsForSplit, finishedSplits);
+        return new MySqlRecords(null, null, finishedSplits);

Review Comment:
   
org.apache.flink.cdc.connectors.mysql.source.NewlyAddedTableITCase#testNewlyAddedEmptyTableAndInsertAfterJobStart
 will throw exception because 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlRecords#nextRecordFromSplit.
   
   If we set spilt id is not null, recordsForSplit cannot be null.
   
   ```java
   Caused by: java.lang.IllegalStateException
        at 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlRecords.nextRecordFromSplit(MySqlRecords.java:70)
 ~[classes/:?]
        at 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlRecords.nextRecordFromSplit(MySqlRecords.java:31)
 ~[classes/:?]
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:199)
 ~[flink-connector-base-1.20.1.jar:1.20.1]
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to