yuxiqian commented on code in PR #3932: URL: https://github.com/apache/flink-cdc/pull/3932#discussion_r1974819667
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java: ########## @@ -99,23 +100,17 @@ protected void processElement( alreadySendCreateTableTables.add(tableId); } } - } else if (splitState.isBinlogSplitState() && !alreadySendCreateTableForBinlogSplit) { - alreadySendCreateTableForBinlogSplit = true; - if (sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) { - // In Snapshot -> Binlog transition of INITIAL startup mode, ensure all table - // schemas have been sent to downstream. We use previously cached schema instead of - // re-request latest schema because there might be some pending schema change events - // in the queue, and that may accidentally emit evolved schema before corresponding - // schema change events. - createTableEventCache.stream() - .filter( - event -> - !alreadySendCreateTableTables.contains( - MySqlSchemaUtils.toDbzTableId(event.tableId()))) - .forEach(output::collect); - } else { - // In Binlog only mode, we simply emit all schemas at once. - createTableEventCache.forEach(output::collect); + } else { + if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) { + TableId tableId = getTableId(element); + if (!alreadySendCreateTableTables.contains(tableId)) { + CreateTableEvent createTableEvent = createTableEventCache.get(tableId); + // New created table in binlog reading phase. Review Comment: ``` --- Create Table A --> Table A Binlog --> Drop Table A --> Startup Offset ---> ^ | What if we start from here? ``` Seems `testDanglingDropTableEventInBinlog` will fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org