leonardBang commented on code in PR #3563: URL: https://github.com/apache/flink-cdc/pull/3563#discussion_r1726328254
########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java: ########## @@ -435,13 +421,26 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh private SchemaChangeResponse requestSchemaChange( TableId tableId, SchemaChangeEvent schemaChangeEvent) { - return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent)); + while (true) { + SchemaChangeResponse response = + sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent)); + if (response.isRejected()) { + try { + LOG.info("Schema Registry is busy now, waiting for next request..."); Review Comment: Do we have a timeout seetting? ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java: ########## @@ -107,16 +101,69 @@ public SchemaRegistryRequestHandler( SchemaDerivation schemaDerivation, SchemaChangeBehavior schemaChangeBehavior) { this.metadataApplier = metadataApplier; - this.activeSinkWriters = new HashSet<>(); - this.flushedSinkWriters = new HashSet<>(); - this.pendingSchemaChanges = new LinkedList<>(); - this.finishedSchemaChanges = new LinkedList<>(); - this.ignoredSchemaChanges = new LinkedList<>(); this.schemaManager = schemaManager; this.schemaDerivation = schemaDerivation; + this.schemaChangeBehavior = schemaChangeBehavior; + + this.activeSinkWriters = new HashSet<>(); + this.flushedSinkWriters = new HashSet<>(); this.schemaChangeThreadPool = Executors.newSingleThreadExecutor(); + this.isSchemaChangeApplying = false; - this.schemaChangeBehavior = schemaChangeBehavior; + this.currentDerivedSchemaChangeEvents = new ArrayList<>(); + this.currentFinishedSchemaChanges = new ArrayList<>(); + this.currentIgnoredSchemaChanges = new ArrayList<>(); + this.isSchemaChangeRequested = new AtomicBoolean(false); + } + + /** + * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing. + * + * @param request the received SchemaChangeRequest + */ + public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest( + SchemaChangeRequest request) { + if (isSchemaChangeRequested.compareAndSet(false, true)) { + isSchemaChangeApplying = true; + LOG.info( + "Received schema change event request {} from table {}. Start to buffer requests for others.", + request.getSchemaChangeEvent(), + request.getTableId().toString()); + SchemaChangeEvent event = request.getSchemaChangeEvent(); + if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) { + LOG.info("Event {} has been addressed before, ignoring it.", event); + finishCurrentSchemaChangeRequest(); + return CompletableFuture.completedFuture( + wrap(new SchemaChangeResponse(Collections.emptyList()))); + } + schemaManager.applyOriginalSchemaChange(event); + List<SchemaChangeEvent> derivedSchemaChangeEvents = + calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); + if (derivedSchemaChangeEvents.isEmpty()) { + finishCurrentSchemaChangeRequest(); + return CompletableFuture.completedFuture( + wrap(new SchemaChangeResponse(Collections.emptyList()))); + } else { + derivedSchemaChangeEvents.forEach( + e -> { + if (e instanceof SchemaChangeEventWithPreSchema) { + SchemaChangeEventWithPreSchema pe = + (SchemaChangeEventWithPreSchema) e; + if (!pe.hasPreSchema()) { + schemaManager + .getLatestEvolvedSchema(pe.tableId()) + .ifPresent(pe::fillPreSchema); + } + } + }); + currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents); + return CompletableFuture.completedFuture( + wrap(new SchemaChangeResponse(derivedSchemaChangeEvents))); + } + } else { + LOG.info("There is already a schema change request in progress. Rejected {}.", request); Review Comment: how about just tell SchemaOperator a BUSY signal instead of a REJECT signal? ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java: ########## @@ -107,16 +101,69 @@ public SchemaRegistryRequestHandler( SchemaDerivation schemaDerivation, SchemaChangeBehavior schemaChangeBehavior) { this.metadataApplier = metadataApplier; - this.activeSinkWriters = new HashSet<>(); - this.flushedSinkWriters = new HashSet<>(); - this.pendingSchemaChanges = new LinkedList<>(); - this.finishedSchemaChanges = new LinkedList<>(); - this.ignoredSchemaChanges = new LinkedList<>(); this.schemaManager = schemaManager; this.schemaDerivation = schemaDerivation; + this.schemaChangeBehavior = schemaChangeBehavior; + + this.activeSinkWriters = new HashSet<>(); + this.flushedSinkWriters = new HashSet<>(); this.schemaChangeThreadPool = Executors.newSingleThreadExecutor(); + this.isSchemaChangeApplying = false; - this.schemaChangeBehavior = schemaChangeBehavior; + this.currentDerivedSchemaChangeEvents = new ArrayList<>(); + this.currentFinishedSchemaChanges = new ArrayList<>(); + this.currentIgnoredSchemaChanges = new ArrayList<>(); + this.isSchemaChangeRequested = new AtomicBoolean(false); + } + + /** + * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing. + * + * @param request the received SchemaChangeRequest + */ + public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest( + SchemaChangeRequest request) { + if (isSchemaChangeRequested.compareAndSet(false, true)) { + isSchemaChangeApplying = true; + LOG.info( + "Received schema change event request {} from table {}. Start to buffer requests for others.", + request.getSchemaChangeEvent(), + request.getTableId().toString()); + SchemaChangeEvent event = request.getSchemaChangeEvent(); + if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) { + LOG.info("Event {} has been addressed before, ignoring it.", event); + finishCurrentSchemaChangeRequest(); + return CompletableFuture.completedFuture( + wrap(new SchemaChangeResponse(Collections.emptyList()))); Review Comment: Could we add some message in SchemaChangeResponse body to let clients know more details? it will be helpful for troubleshooting -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org