yuxiqian commented on code in PR #3563: URL: https://github.com/apache/flink-cdc/pull/3563#discussion_r1726511886
########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java: ########## @@ -412,36 +404,62 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh schemaChangeEvent)); } - // The request will need to send a FlushEvent or block until flushing finished + // The request will block if another schema change event is being handled SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent); - if (!response.getSchemaChangeEvents().isEmpty()) { - LOG.info( - "Sending the FlushEvent for table {} in subtask {}.", - tableId, - getRuntimeContext().getIndexOfThisSubtask()); + if (response.isAccepted()) { + LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId, tableId); Review Comment: This is intended to imitate how Values data sink prints subTaskId to distinguish them: ```java if (print) { String prefix = numSubtasks > 1 ? subtaskIndex + "> " : ""; // print the detail message to console for verification. System.out.println( prefix + ValuesDataSinkHelper.convertEventToStr( event, fieldGetterMaps.get(((ChangeEvent) event).tableId()))); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org