skywalker0618 commented on code in PR #18446:
URL: https://github.com/apache/hudi/pull/18446#discussion_r3030877663


##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java:
##########
@@ -110,6 +110,31 @@ public boolean isEmpty() {
   public void seal() {
   }
 
+  /**
+   * Waits until all published events have been consumed by the handler thread,
+   * without shutting down the disruptor. The handler thread remains alive 
after
+   * this method returns — this is critical when downstream components (e.g. 
GCS
+   * pipe) require the writing thread to stay alive during a subsequent flush.
+   *
+   * <p>Must only be called when no new events will be published (e.g. during
+   * Flink's {@code snapshotState} where {@code processElement} is guaranteed
+   * not to run concurrently).
+   */
+  public void waitUntilDrained(long timeoutMs) {
+    long start = System.currentTimeMillis();
+    while (!isEmpty()) {
+      if (throwable.get() != null) {
+        return;
+      }
+      if (System.currentTimeMillis() - start > timeoutMs) {
+        throw new HoodieException(
+            "Timed out waiting for disruptor queue to drain after "
+                + timeoutMs + "ms, remaining: " + size());
+      }
+      Thread.yield();

Review Comment:
   Make sense, changed the code, thanks!



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java:
##########
@@ -140,40 +140,48 @@ public void processElement(T value, Context ctx, 
Collector<RowData> out) throws
   @Override
   public void snapshotState() {
     try {
-      flushDisruptor();
-      reinitDisruptorAfterCheckpoint();
+      drainDisruptor();
     } catch (Exception e) {
       throw new HoodieException("Fail to flush data during snapshot state.", 
e);
     }
+    // Flush Parquet data to the underlying filesystem. The disruptor thread is
+    // still alive, so filesystem connectors that use PipedInputStream (e.g. 
GCS
+    // Hadoop connector) won't see a dead writeSide and throw "Pipe broken".
     super.snapshotState();
   }
 
   @Override
   public void endInput() {
     try {
-      flushDisruptor();
+      drainDisruptor();
     } catch (Exception e) {
       throw new HoodieException("Fail to flush data during endInput.", e);
     }
     super.endInput();
   }
 
-  private void flushDisruptor() {
-    disruptorQueue.close();
-    // Check if any errors occurred during event processing
+  private static final long DRAIN_TIMEOUT_MS = 30_000;
+
+  /**
+   * Waits for the disruptor thread to finish consuming all records in the ring
+   * buffer, then flushes the remaining sort buffer into the Parquet writer.
+   *
+   * <p>Unlike the previous approach that called {@code disruptorQueue.close()}
+   * (which kills the disruptor thread), this keeps the thread alive. This is
+   * critical because some filesystem connectors (e.g. GCS Hadoop connector)
+   * use a {@code PipedInputStream} that checks {@code writeSide.isAlive()} —
+   * if the thread that last wrote to the pipe is dead, the upload thread
+   * throws "Pipe broken".
+   */
+  private void drainDisruptor() {
+    disruptorQueue.waitUntilDrained(DRAIN_TIMEOUT_MS);

Review Comment:
   Sounds good, changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to