lvyanquan commented on code in PR #3932:
URL: https://github.com/apache/flink-cdc/pull/3932#discussion_r1974878864


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java:
##########
@@ -99,23 +100,17 @@ protected void processElement(
                     alreadySendCreateTableTables.add(tableId);
                 }
             }
-        } else if (splitState.isBinlogSplitState() && 
!alreadySendCreateTableForBinlogSplit) {
-            alreadySendCreateTableForBinlogSplit = true;
-            if 
(sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
-                // In Snapshot -> Binlog transition of INITIAL startup mode, 
ensure all table
-                // schemas have been sent to downstream. We use previously 
cached schema instead of
-                // re-request latest schema because there might be some 
pending schema change events
-                // in the queue, and that may accidentally emit evolved schema 
before corresponding
-                // schema change events.
-                createTableEventCache.stream()
-                        .filter(
-                                event ->
-                                        !alreadySendCreateTableTables.contains(
-                                                
MySqlSchemaUtils.toDbzTableId(event.tableId())))
-                        .forEach(output::collect);
-            } else {
-                // In Binlog only mode, we simply emit all schemas at once.
-                createTableEventCache.forEach(output::collect);
+        } else {
+            if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) {
+                TableId tableId = getTableId(element);
+                if (!alreadySendCreateTableTables.contains(tableId)) {
+                    CreateTableEvent createTableEvent = 
createTableEventCache.get(tableId);
+                    // New created table in binlog reading phase.

Review Comment:
   This is indeed a problem, but it is different from the one I hope to solve 
here.
   
   For DataChange, perhaps we can derive the schema from `SourceRecord` instead 
of using SQL to query the latest schema. But to make this problem more 
difficult, if we start reading from a position where SchemaChange happened, we 
cannot derive the original schema.
   ```
   --- Create Table A --> Alter Table A Add a column--> Table A Binlog --> 
Startup Offset --->
   
                        ^
                        |
           What if we start from here?
   ```         
   
   So I think starting from a position where historical schema is different 
with the current schema is still an unresolved issue for us.                   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to