yuxiqian commented on code in PR #3932:
URL: https://github.com/apache/flink-cdc/pull/3932#discussion_r1974819667


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -99,23 +100,17 @@ protected void processElement(
                     alreadySendCreateTableTables.add(tableId);
                 }
             }
-        } else if (splitState.isBinlogSplitState() && 
!alreadySendCreateTableForBinlogSplit) {
-            alreadySendCreateTableForBinlogSplit = true;
-            if 
(sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
-                // In Snapshot -> Binlog transition of INITIAL startup mode, 
ensure all table
-                // schemas have been sent to downstream. We use previously 
cached schema instead of
-                // re-request latest schema because there might be some 
pending schema change events
-                // in the queue, and that may accidentally emit evolved schema 
before corresponding
-                // schema change events.
-                createTableEventCache.stream()
-                        .filter(
-                                event ->
-                                        !alreadySendCreateTableTables.contains(
-                                                
MySqlSchemaUtils.toDbzTableId(event.tableId())))
-                        .forEach(output::collect);
-            } else {
-                // In Binlog only mode, we simply emit all schemas at once.
-                createTableEventCache.forEach(output::collect);
+        } else {
+            if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) {
+                TableId tableId = getTableId(element);
+                if (!alreadySendCreateTableTables.contains(tableId)) {
+                    CreateTableEvent createTableEvent = 
createTableEventCache.get(tableId);
+                    // New created table in binlog reading phase.

Review Comment:
   ```
   --- Create Table A --> Table A Binlog --> Drop Table A --> Startup Offset 
--->
   
                       ^
                       |
          What if we start from here?
   ```
   
   Seems `testDanglingDropTableEventInBinlog` will fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to