danny0405 commented on code in PR #18411:
URL: https://github.com/apache/hudi/pull/18411#discussion_r3013409245
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -385,16 +407,23 @@ public void executionAttemptReady(int i, int
attemptNumber, SubtaskGateway gatew
@Override
public CompletableFuture<CoordinationResponse>
handleCoordinationRequest(CoordinationRequest request) {
- if (request instanceof Correspondent.InstantTimeRequest) {
- return handleInstantRequest((Correspondent.InstantTimeRequest) request);
- }
- if (request instanceof Correspondent.InflightInstantsRequest) {
- return
handleInFlightInstantsRequest((Correspondent.InflightInstantsRequest) request);
- }
- if (request instanceof Correspondent.AwaitPendingInstantsRequest) {
- return
handleAwaitPendingInstantsRequest((Correspondent.AwaitPendingInstantsRequest)
request);
- }
- throw new HoodieException("Unexpected coordination request type: " +
request.getClass().getSimpleName());
+ // Gate coordination requests on initialization completion. The upgrade,
+ // metadata table init, and event restoration run asynchronously on the
+ // executor thread (see start()). Coordination requests like startInstant()
+ // run on a separate instantRequestExecutor and must not race ahead of
+ // initialization — e.g., RecordIndexPartitioner depends on MDT init.
+ return initFuture.thenCompose(ignored -> {
Review Comment:
we have some other code snippets that relies on the completion of table
upgrade/downgrade(table state ready for write), this makes the dependency more
complicated, does it make sense we move these heavy upgrade into a separate
spark job?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]