yuxiqian commented on code in PR #3563:
URL: https://github.com/apache/flink-cdc/pull/3563#discussion_r1726511886


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -412,36 +404,62 @@ private void handleSchemaChangeEvent(TableId tableId, 
SchemaChangeEvent schemaCh
                             schemaChangeEvent));
         }
 
-        // The request will need to send a FlushEvent or block until flushing 
finished
+        // The request will block if another schema change event is being 
handled
         SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
-        if (!response.getSchemaChangeEvents().isEmpty()) {
-            LOG.info(
-                    "Sending the FlushEvent for table {} in subtask {}.",
-                    tableId,
-                    getRuntimeContext().getIndexOfThisSubtask());
+        if (response.isAccepted()) {
+            LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId, 
tableId);

Review Comment:
   This is intended to imitate how Values data sink prints subTaskId to 
distinguish them:
   
   ```java
   if (print) {
       String prefix = numSubtasks > 1 ? subtaskIndex + "> " : "";
       // print the detail message to console for verification.
       System.out.println(
               prefix
                       + ValuesDataSinkHelper.convertEventToStr(
                               event,
                               fieldGetterMaps.get(((ChangeEvent) 
event).tableId())));
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to