yuxiqian commented on code in PR #3932:
URL: https://github.com/apache/flink-cdc/pull/3932#discussion_r1974819667


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -99,23 +100,17 @@ protected void processElement(
                     alreadySendCreateTableTables.add(tableId);
                 }
             }
-        } else if (splitState.isBinlogSplitState() && 
!alreadySendCreateTableForBinlogSplit) {
-            alreadySendCreateTableForBinlogSplit = true;
-            if 
(sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
-                // In Snapshot -> Binlog transition of INITIAL startup mode, 
ensure all table
-                // schemas have been sent to downstream. We use previously 
cached schema instead of
-                // re-request latest schema because there might be some 
pending schema change events
-                // in the queue, and that may accidentally emit evolved schema 
before corresponding
-                // schema change events.
-                createTableEventCache.stream()
-                        .filter(
-                                event ->
-                                        !alreadySendCreateTableTables.contains(
-                                                
MySqlSchemaUtils.toDbzTableId(event.tableId())))
-                        .forEach(output::collect);
-            } else {
-                // In Binlog only mode, we simply emit all schemas at once.
-                createTableEventCache.forEach(output::collect);
+        } else {
+            if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) {
+                TableId tableId = getTableId(element);
+                if (!alreadySendCreateTableTables.contains(tableId)) {
+                    CreateTableEvent createTableEvent = 
createTableEventCache.get(tableId);
+                    // New created table in binlog reading phase.

Review Comment:
   ```
   --- Create Table A --> Table A Binlog --> Drop Table A --> Startup Offset 
--->
   
                       ^
                       |
          What if we start from here?
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -99,23 +100,17 @@ protected void processElement(
                     alreadySendCreateTableTables.add(tableId);
                 }
             }
-        } else if (splitState.isBinlogSplitState() && 
!alreadySendCreateTableForBinlogSplit) {
-            alreadySendCreateTableForBinlogSplit = true;
-            if 
(sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
-                // In Snapshot -> Binlog transition of INITIAL startup mode, 
ensure all table
-                // schemas have been sent to downstream. We use previously 
cached schema instead of
-                // re-request latest schema because there might be some 
pending schema change events
-                // in the queue, and that may accidentally emit evolved schema 
before corresponding
-                // schema change events.
-                createTableEventCache.stream()
-                        .filter(
-                                event ->
-                                        !alreadySendCreateTableTables.contains(
-                                                
MySqlSchemaUtils.toDbzTableId(event.tableId())))
-                        .forEach(output::collect);
-            } else {
-                // In Binlog only mode, we simply emit all schemas at once.
-                createTableEventCache.forEach(output::collect);
+        } else {
+            if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) {
+                TableId tableId = getTableId(element);
+                if (!alreadySendCreateTableTables.contains(tableId)) {

Review Comment:
   Seems `alreadySendCreateTableTables` and `createTableEventCache` are not 
being stored into MySQL source state persistently. Will that block users from 
recovering from an earlier binlog state where schemas are different from 
current state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to