cshuo commented on code in PR #18446:
URL: https://github.com/apache/hudi/pull/18446#discussion_r3031062406
##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java:
##########
@@ -110,6 +111,31 @@ public boolean isEmpty() {
public void seal() {
}
+ /**
+ * Waits until all published events have been consumed by the handler thread,
+ * without shutting down the disruptor. The handler thread remains alive
after
+ * this method returns — this is critical when downstream components (e.g.
GCS
+ * pipe) require the writing thread to stay alive during a subsequent flush.
+ *
+ * <p>Must only be called when no new events will be published (e.g. during
+ * Flink's {@code snapshotState} where {@code processElement} is guaranteed
+ * not to run concurrently).
+ *
+ * <p>The method returns early if the consumer has failed ({@link
#getThrowable()})
+ * or the calling thread has been interrupted (e.g. by Flink's checkpoint
timeout).
+ */
+ public void waitUntilDrained() {
+ while (!isEmpty()) {
+ if (throwable.get() != null) {
+ return;
+ }
+ if (Thread.currentThread().isInterrupted()) {
Review Comment:
Mark as failed before return:
`markAsFailed(new HoodieException("Interrupted while waiting for disruptor
queue to drain"));`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]