This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c69933b92d1 fix: Avoid early cleanup of Dart workers. (#19341)
c69933b92d1 is described below

commit c69933b92d171d658ddc9869664ce1aa22a8a680
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Apr 17 15:14:18 2026 -0700

    fix: Avoid early cleanup of Dart workers. (#19341)
    
    In #19233 controllers were updated to be canceled using a combination
    of stop + direct thread interrupt. This patch makes worker cancellation
    work the same way.
    
    The change is necessary to fix a problem where the worker context could
    be closed before the worker finishes running, since the worker context
    is closed using a listener on the run future. To ensure proper
    sequencing, the run future must not be directly cancelable, since that
    would cause the listener to fire too early.
---
 .../org/apache/druid/msq/exec/WorkerRunRef.java    | 56 +++++++++++++++++-----
 .../apache/druid/msq/exec/WorkerRunRefTest.java    |  2 +-
 2 files changed, 45 insertions(+), 13 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java
index 233bff5a68f..adf851bb595 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java
@@ -41,9 +41,22 @@ public class WorkerRunRef
   @GuardedBy("this")
   private Worker worker;
 
+  /**
+   * Future returned by {@link #run(Worker, ListeningExecutorService)}.
+   */
   @GuardedBy("this")
   private ListenableFuture<?> workerRunFuture;
 
+  /**
+   * Thread running the worker. Set inside the {@link #run} runnable, used by 
{@link #cancel()} to interrupt
+   * the worker thread directly.
+   */
+  @GuardedBy("this")
+  private Thread workerThread;
+
+  /**
+   * Flag that is set by {@link #cancel()}.
+   */
   @GuardedBy("this")
   private boolean canceled;
 
@@ -64,8 +77,13 @@ public class WorkerRunRef
 
     this.worker = worker;
 
-    return workerRunFuture = exec.submit(() -> {
+    this.workerRunFuture = exec.submit(() -> {
       final String originalThreadName = Thread.currentThread().getName();
+
+      synchronized (this) {
+        workerThread = Thread.currentThread();
+      }
+
       try {
         Thread.currentThread().setName(StringUtils.format("%s[%s]", 
originalThreadName, worker.id()));
         worker.run();
@@ -78,9 +96,20 @@ public class WorkerRunRef
         }
       }
       finally {
+        synchronized (this) {
+          workerThread = null;
+          // Clear any interrupt delivered during or after worker.run().
+          //noinspection ResultOfMethodCallIgnored
+          Thread.interrupted();
+        }
+
         Thread.currentThread().setName(originalThreadName);
       }
     });
+
+    // Must not cancel the above future, otherwise listeners with cleanup 
actions may fire before the worker
+    // has fully stopped. Cancellation is done by calling cancel() on the 
WorkerRunRef itself.
+    return Futures.nonCancellationPropagating(workerRunFuture);
   }
 
   public synchronized Worker worker()
@@ -109,29 +138,32 @@ public class WorkerRunRef
       return;
     }
 
-    // Interrupt the worker's run future, so the run() thread stops executing.
-    if (workerRunFuture != null) {
-      workerRunFuture.cancel(true);
-    }
-
-    // Also directly signal the run() thread to stop. Ideally this shouldn't 
be necessary, since the
-    // interrupt should be enough. But, in case there are any code paths that 
erroneously swallow
-    // InterruptedException, this provides a failsafe cancellation mechanism.
+    // Directly signal the worker to stop.
     worker.stop();
+
+    // Interrupt the worker as a failsafe, in case the worker is blocked on 
something.
+    if (workerThread != null) {
+      workerThread.interrupt();
+    }
   }
 
   /**
    * Wait for the worker run to finish. Does not throw exceptions from the 
future, even if the worker
    * ended exceptionally.
    */
-  public synchronized void awaitStop()
+  public void awaitStop()
   {
-    if (workerRunFuture == null) {
+    final ListenableFuture<?> future;
+    synchronized (this) {
+      future = workerRunFuture;
+    }
+
+    if (future == null) {
       throw DruidException.defensive("Not running");
     }
 
     try {
-      workerRunFuture.get();
+      future.get();
     }
     catch (InterruptedException e) {
       Thread.currentThread().interrupt();
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java
index 59ae71f18d8..6f12c6bed19 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java
@@ -112,7 +112,7 @@ public class WorkerRunRefTest
     workerFinished.await();
 
     Assert.assertTrue("Future should be done", future.isDone());
-    Assert.assertTrue("Future should be canceled", future.isCancelled());
+    Assert.assertFalse("Future should not be canceled", future.isCancelled());
     Assert.assertTrue("Worker should have been interrupted", 
wasInterrupted.get());
 
     // awaitStop should return immediately since the worker is done


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to