yihua commented on code in PR #18411:
URL: https://github.com/apache/hudi/pull/18411#discussion_r3034562056
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -385,16 +407,23 @@ public void executionAttemptReady(int i, int
attemptNumber, SubtaskGateway gatew
@Override
public CompletableFuture<CoordinationResponse>
handleCoordinationRequest(CoordinationRequest request) {
- if (request instanceof Correspondent.InstantTimeRequest) {
- return handleInstantRequest((Correspondent.InstantTimeRequest) request);
- }
- if (request instanceof Correspondent.InflightInstantsRequest) {
- return
handleInFlightInstantsRequest((Correspondent.InflightInstantsRequest) request);
- }
- if (request instanceof Correspondent.AwaitPendingInstantsRequest) {
- return
handleAwaitPendingInstantsRequest((Correspondent.AwaitPendingInstantsRequest)
request);
- }
- throw new HoodieException("Unexpected coordination request type: " +
request.getClass().getSimpleName());
+ // Gate coordination requests on initialization completion. The upgrade,
+ // metadata table init, and event restoration run asynchronously on the
+ // executor thread (see start()). Coordination requests like startInstant()
+ // run on a separate instantRequestExecutor and must not race ahead of
+ // initialization — e.g., RecordIndexPartitioner depends on MDT init.
+ return initFuture.thenCompose(ignored -> {
Review Comment:
🤖 Good question. A separate pre-upgrade step would keep the coordinator's
`start()` simple, but it does change the UX from a transparent on-first-write
upgrade to an explicit manual step. One middle ground: keep the in-place
upgrade path but add a standalone CLI tool (or Spark action) that users with
very large archived timelines can run ahead of time. That way the common case
stays zero-touch, and only the pathological case (thousands of archived
actions) needs the separate job.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -244,15 +245,36 @@ public void start() throws Exception {
.threadFactory(getThreadFactory("instant-request"))
.exceptionHook((errMsg, t) -> this.context.failJob(new
HoodieException(errMsg, t)))
.build();
- // start the executor if required
- if (tableState.syncHive) {
- initHiveSync();
- }
- // start client id heartbeats for optimistic concurrency control
- if (OptionsResolver.isMultiWriter(conf)) {
- initClientIds(conf);
- }
- restoreEvents();
+
+ // Run upgrade, metadata table init, and event restoration on the
executor
+ // thread instead of the Pekko dispatcher thread. Running these
synchronously
+ // on the dispatcher thread blocks heartbeat responses when the
operations
+ // involve heavy I/O (e.g., LSM timeline migration with hundreds of
archived
+ // actions), causing the ResourceManager to disconnect the JobManager.
+ //
+ // Safety guarantees:
+ // - Events via handleEventFromOperator() are submitted to the same
+ // single-threaded executor, so FIFO ordering ensures initialization
Review Comment:
🤖 The outer `catch (Throwable throwable)` block at the end of `start()` also
doesn't complete `initFuture`. If an exception is thrown after the executors
are created but before the lambda is submitted (or if `executor.execute()`
itself fails to enqueue), `initFuture` will remain incomplete and coordination
requests will hang. It might be worth adding
`initFuture.completeExceptionally(throwable)` in that catch block as well.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -244,15 +245,36 @@ public void start() throws Exception {
.threadFactory(getThreadFactory("instant-request"))
.exceptionHook((errMsg, t) -> this.context.failJob(new
HoodieException(errMsg, t)))
.build();
- // start the executor if required
- if (tableState.syncHive) {
- initHiveSync();
- }
- // start client id heartbeats for optimistic concurrency control
- if (OptionsResolver.isMultiWriter(conf)) {
- initClientIds(conf);
- }
- restoreEvents();
+
+ // Run upgrade, metadata table init, and event restoration on the
executor
+ // thread instead of the Pekko dispatcher thread. Running these
synchronously
+ // on the dispatcher thread blocks heartbeat responses when the
operations
+ // involve heavy I/O (e.g., LSM timeline migration with hundreds of
archived
+ // actions), causing the ResourceManager to disconnect the JobManager.
+ //
+ // Safety guarantees:
+ // - Events via handleEventFromOperator() are submitted to the same
+ // single-threaded executor, so FIFO ordering ensures initialization
+ // completes before any event processing.
+ // - Coordination requests via handleCoordinationRequest() run on the
+ // separate instantRequestExecutor and are gated on initFuture to
+ // prevent startInstant() from racing ahead of the upgrade.
+ this.executor.execute(() -> {
+ this.writeClient.tryUpgrade(instant, this.metaClient);
+ initMetadataTable(this.writeClient);
+ if (tableState.scheduleMdtCompaction) {
+ this.metadataWriteClient =
StreamerUtil.createMetadataWriteClient(writeClient);
+ }
+ if (tableState.syncHive) {
+ initHiveSync();
+ }
Review Comment:
🤖 If any of the init steps (tryUpgrade, initMetadataTable, etc.) throw an
exception, `NonThrownExecutor` swallows it and calls the exception hook, but
`initFuture.complete(null)` is never reached. This leaves `initFuture`
permanently incomplete, so any `handleCoordinationRequest` caller already
waiting on `initFuture.thenCompose(...)` will hang indefinitely. Could you wrap
the lambda body in try-finally (or try-catch) to ensure
`initFuture.completeExceptionally(t)` on failure?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]