lvyanquan commented on code in PR #3932: URL: https://github.com/apache/flink-cdc/pull/3932#discussion_r1974878864
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java: ########## @@ -99,23 +100,17 @@ protected void processElement( alreadySendCreateTableTables.add(tableId); } } - } else if (splitState.isBinlogSplitState() && !alreadySendCreateTableForBinlogSplit) { - alreadySendCreateTableForBinlogSplit = true; - if (sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) { - // In Snapshot -> Binlog transition of INITIAL startup mode, ensure all table - // schemas have been sent to downstream. We use previously cached schema instead of - // re-request latest schema because there might be some pending schema change events - // in the queue, and that may accidentally emit evolved schema before corresponding - // schema change events. - createTableEventCache.stream() - .filter( - event -> - !alreadySendCreateTableTables.contains( - MySqlSchemaUtils.toDbzTableId(event.tableId()))) - .forEach(output::collect); - } else { - // In Binlog only mode, we simply emit all schemas at once. - createTableEventCache.forEach(output::collect); + } else { + if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) { + TableId tableId = getTableId(element); + if (!alreadySendCreateTableTables.contains(tableId)) { + CreateTableEvent createTableEvent = createTableEventCache.get(tableId); + // New created table in binlog reading phase. Review Comment: This is indeed a problem, but it is different from the one I hope to solve here. For DataChange, perhaps we can derive the schema from `SourceRecord` instead of using SQL to query the latest schema. But to make this problem more difficult, if we start reading from a position where SchemaChange happened, we cannot derive the original schema. ``` --- Create Table A --> Alter Table A Add a column--> Table A Binlog --> Startup Offset ---> ^ | What if we start from here? ``` So I think starting from a position where historical schema is different with the current schema is still an unresolved issue for us. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org