cshuo commented on code in PR #18446:
URL: https://github.com/apache/hudi/pull/18446#discussion_r3026752551
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java:
##########
@@ -140,40 +140,48 @@ public void processElement(T value, Context ctx,
Collector<RowData> out) throws
@Override
public void snapshotState() {
try {
- flushDisruptor();
- reinitDisruptorAfterCheckpoint();
+ drainDisruptor();
} catch (Exception e) {
throw new HoodieException("Fail to flush data during snapshot state.",
e);
}
+ // Flush Parquet data to the underlying filesystem. The disruptor thread is
+ // still alive, so filesystem connectors that use PipedInputStream (e.g.
GCS
+ // Hadoop connector) won't see a dead writeSide and throw "Pipe broken".
super.snapshotState();
}
@Override
public void endInput() {
try {
- flushDisruptor();
+ drainDisruptor();
} catch (Exception e) {
throw new HoodieException("Fail to flush data during endInput.", e);
}
super.endInput();
}
- private void flushDisruptor() {
- disruptorQueue.close();
- // Check if any errors occurred during event processing
+ private static final long DRAIN_TIMEOUT_MS = 30_000;
+
+ /**
+ * Waits for the disruptor thread to finish consuming all records in the ring
+ * buffer, then flushes the remaining sort buffer into the Parquet writer.
+ *
+ * <p>Unlike the previous approach that called {@code disruptorQueue.close()}
+ * (which kills the disruptor thread), this keeps the thread alive. This is
+ * critical because some filesystem connectors (e.g. GCS Hadoop connector)
+ * use a {@code PipedInputStream} that checks {@code writeSide.isAlive()} —
+ * if the thread that last wrote to the pipe is dead, the upload thread
+ * throws "Pipe broken".
+ */
+ private void drainDisruptor() {
+ disruptorQueue.waitUntilDrained(DRAIN_TIMEOUT_MS);
Review Comment:
We don’t need to add a hardcode timeout here; we can rely on Flink’s timeout
mechanisms, such as the checkpoint timeout.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java:
##########
@@ -110,6 +110,31 @@ public boolean isEmpty() {
public void seal() {
}
+ /**
+ * Waits until all published events have been consumed by the handler thread,
+ * without shutting down the disruptor. The handler thread remains alive
after
+ * this method returns — this is critical when downstream components (e.g.
GCS
+ * pipe) require the writing thread to stay alive during a subsequent flush.
+ *
+ * <p>Must only be called when no new events will be published (e.g. during
+ * Flink's {@code snapshotState} where {@code processElement} is guaranteed
+ * not to run concurrently).
+ */
+ public void waitUntilDrained(long timeoutMs) {
+ long start = System.currentTimeMillis();
+ while (!isEmpty()) {
+ if (throwable.get() != null) {
+ return;
+ }
+ if (System.currentTimeMillis() - start > timeoutMs) {
+ throw new HoodieException(
+ "Timed out waiting for disruptor queue to drain after "
+ + timeoutMs + "ms, remaining: " + size());
+ }
+ Thread.yield();
Review Comment:
`waitUntilDrained()` is still a busy-spin loop. `yield()` does not make the
thread block or sleep; it only hints to the scheduler that another runnable
thread may run, and the current thread can be scheduled again immediately, so
this is still effectively spinning. Consider adding a small
`LockSupport.parkNanos(100_000)` backoff instead of pure spin.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]