leonardBang commented on code in PR #3563:
URL: https://github.com/apache/flink-cdc/pull/3563#discussion_r1726328254


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -435,13 +421,26 @@ private void handleSchemaChangeEvent(TableId tableId, 
SchemaChangeEvent schemaCh
 
     private SchemaChangeResponse requestSchemaChange(
             TableId tableId, SchemaChangeEvent schemaChangeEvent) {
-        return sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
+        while (true) {
+            SchemaChangeResponse response =
+                    sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
+            if (response.isRejected()) {
+                try {
+                    LOG.info("Schema Registry is busy now, waiting for next 
request...");

Review Comment:
   Do we have a timeout seetting?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java:
##########
@@ -107,16 +101,69 @@ public SchemaRegistryRequestHandler(
             SchemaDerivation schemaDerivation,
             SchemaChangeBehavior schemaChangeBehavior) {
         this.metadataApplier = metadataApplier;
-        this.activeSinkWriters = new HashSet<>();
-        this.flushedSinkWriters = new HashSet<>();
-        this.pendingSchemaChanges = new LinkedList<>();
-        this.finishedSchemaChanges = new LinkedList<>();
-        this.ignoredSchemaChanges = new LinkedList<>();
         this.schemaManager = schemaManager;
         this.schemaDerivation = schemaDerivation;
+        this.schemaChangeBehavior = schemaChangeBehavior;
+
+        this.activeSinkWriters = new HashSet<>();
+        this.flushedSinkWriters = new HashSet<>();
         this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
+
         this.isSchemaChangeApplying = false;
-        this.schemaChangeBehavior = schemaChangeBehavior;
+        this.currentDerivedSchemaChangeEvents = new ArrayList<>();
+        this.currentFinishedSchemaChanges = new ArrayList<>();
+        this.currentIgnoredSchemaChanges = new ArrayList<>();
+        this.isSchemaChangeRequested = new AtomicBoolean(false);
+    }
+
+    /**
+     * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks 
flushing.
+     *
+     * @param request the received SchemaChangeRequest
+     */
+    public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
+            SchemaChangeRequest request) {
+        if (isSchemaChangeRequested.compareAndSet(false, true)) {
+            isSchemaChangeApplying = true;
+            LOG.info(
+                    "Received schema change event request {} from table {}. 
Start to buffer requests for others.",
+                    request.getSchemaChangeEvent(),
+                    request.getTableId().toString());
+            SchemaChangeEvent event = request.getSchemaChangeEvent();
+            if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
+                LOG.info("Event {} has been addressed before, ignoring it.", 
event);
+                finishCurrentSchemaChangeRequest();
+                return CompletableFuture.completedFuture(
+                        wrap(new 
SchemaChangeResponse(Collections.emptyList())));
+            }
+            schemaManager.applyOriginalSchemaChange(event);
+            List<SchemaChangeEvent> derivedSchemaChangeEvents =
+                    
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
+            if (derivedSchemaChangeEvents.isEmpty()) {
+                finishCurrentSchemaChangeRequest();
+                return CompletableFuture.completedFuture(
+                        wrap(new 
SchemaChangeResponse(Collections.emptyList())));
+            } else {
+                derivedSchemaChangeEvents.forEach(
+                        e -> {
+                            if (e instanceof SchemaChangeEventWithPreSchema) {
+                                SchemaChangeEventWithPreSchema pe =
+                                        (SchemaChangeEventWithPreSchema) e;
+                                if (!pe.hasPreSchema()) {
+                                    schemaManager
+                                            
.getLatestEvolvedSchema(pe.tableId())
+                                            .ifPresent(pe::fillPreSchema);
+                                }
+                            }
+                        });
+                currentDerivedSchemaChangeEvents = new 
ArrayList<>(derivedSchemaChangeEvents);
+                return CompletableFuture.completedFuture(
+                        wrap(new 
SchemaChangeResponse(derivedSchemaChangeEvents)));
+            }
+        } else {
+            LOG.info("There is already a schema change request in progress. 
Rejected {}.", request);

Review Comment:
   how about just tell SchemaOperator a BUSY signal instead of a REJECT signal?



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java:
##########
@@ -107,16 +101,69 @@ public SchemaRegistryRequestHandler(
             SchemaDerivation schemaDerivation,
             SchemaChangeBehavior schemaChangeBehavior) {
         this.metadataApplier = metadataApplier;
-        this.activeSinkWriters = new HashSet<>();
-        this.flushedSinkWriters = new HashSet<>();
-        this.pendingSchemaChanges = new LinkedList<>();
-        this.finishedSchemaChanges = new LinkedList<>();
-        this.ignoredSchemaChanges = new LinkedList<>();
         this.schemaManager = schemaManager;
         this.schemaDerivation = schemaDerivation;
+        this.schemaChangeBehavior = schemaChangeBehavior;
+
+        this.activeSinkWriters = new HashSet<>();
+        this.flushedSinkWriters = new HashSet<>();
         this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
+
         this.isSchemaChangeApplying = false;
-        this.schemaChangeBehavior = schemaChangeBehavior;
+        this.currentDerivedSchemaChangeEvents = new ArrayList<>();
+        this.currentFinishedSchemaChanges = new ArrayList<>();
+        this.currentIgnoredSchemaChanges = new ArrayList<>();
+        this.isSchemaChangeRequested = new AtomicBoolean(false);
+    }
+
+    /**
+     * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks 
flushing.
+     *
+     * @param request the received SchemaChangeRequest
+     */
+    public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
+            SchemaChangeRequest request) {
+        if (isSchemaChangeRequested.compareAndSet(false, true)) {
+            isSchemaChangeApplying = true;
+            LOG.info(
+                    "Received schema change event request {} from table {}. 
Start to buffer requests for others.",
+                    request.getSchemaChangeEvent(),
+                    request.getTableId().toString());
+            SchemaChangeEvent event = request.getSchemaChangeEvent();
+            if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
+                LOG.info("Event {} has been addressed before, ignoring it.", 
event);
+                finishCurrentSchemaChangeRequest();
+                return CompletableFuture.completedFuture(
+                        wrap(new 
SchemaChangeResponse(Collections.emptyList())));

Review Comment:
   Could we add some message in SchemaChangeResponse body to let clients know 
more details? it will be helpful for troubleshooting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to