This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c69933b92d1 fix: Avoid early cleanup of Dart workers. (#19341)
c69933b92d1 is described below
commit c69933b92d171d658ddc9869664ce1aa22a8a680
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Apr 17 15:14:18 2026 -0700
fix: Avoid early cleanup of Dart workers. (#19341)
In #19233 controllers were updated to be canceled using a combination
of stop + direct thread interrupt. This patch makes worker cancellation
work the same way.
The change is necessary to fix a problem where the worker context could
be closed before the worker finishes running, since the worker context
is closed using a listener on the run future. To ensure proper
sequencing, the run future must not be directly cancelable, since that
would cause the listener to fire too early.
---
.../org/apache/druid/msq/exec/WorkerRunRef.java | 56 +++++++++++++++++-----
.../apache/druid/msq/exec/WorkerRunRefTest.java | 2 +-
2 files changed, 45 insertions(+), 13 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java
index 233bff5a68f..adf851bb595 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerRunRef.java
@@ -41,9 +41,22 @@ public class WorkerRunRef
@GuardedBy("this")
private Worker worker;
+ /**
+ * Future returned by {@link #run(Worker, ListeningExecutorService)}.
+ */
@GuardedBy("this")
private ListenableFuture<?> workerRunFuture;
+ /**
+ * Thread running the worker. Set inside the {@link #run} runnable, used by
{@link #cancel()} to interrupt
+ * the worker thread directly.
+ */
+ @GuardedBy("this")
+ private Thread workerThread;
+
+ /**
+ * Flag that is set by {@link #cancel()}.
+ */
@GuardedBy("this")
private boolean canceled;
@@ -64,8 +77,13 @@ public class WorkerRunRef
this.worker = worker;
- return workerRunFuture = exec.submit(() -> {
+ this.workerRunFuture = exec.submit(() -> {
final String originalThreadName = Thread.currentThread().getName();
+
+ synchronized (this) {
+ workerThread = Thread.currentThread();
+ }
+
try {
Thread.currentThread().setName(StringUtils.format("%s[%s]",
originalThreadName, worker.id()));
worker.run();
@@ -78,9 +96,20 @@ public class WorkerRunRef
}
}
finally {
+ synchronized (this) {
+ workerThread = null;
+ // Clear any interrupt delivered during or after worker.run().
+ //noinspection ResultOfMethodCallIgnored
+ Thread.interrupted();
+ }
+
Thread.currentThread().setName(originalThreadName);
}
});
+
+ // Must not cancel the above future, otherwise listeners with cleanup
actions may fire before the worker
+ // has fully stopped. Cancellation is done by calling cancel() on the
WorkerRunRef itself.
+ return Futures.nonCancellationPropagating(workerRunFuture);
}
public synchronized Worker worker()
@@ -109,29 +138,32 @@ public class WorkerRunRef
return;
}
- // Interrupt the worker's run future, so the run() thread stops executing.
- if (workerRunFuture != null) {
- workerRunFuture.cancel(true);
- }
-
- // Also directly signal the run() thread to stop. Ideally this shouldn't
be necessary, since the
- // interrupt should be enough. But, in case there are any code paths that
erroneously swallow
- // InterruptedException, this provides a failsafe cancellation mechanism.
+ // Directly signal the worker to stop.
worker.stop();
+
+ // Interrupt the worker as a failsafe, in case the worker is blocked on
something.
+ if (workerThread != null) {
+ workerThread.interrupt();
+ }
}
/**
* Wait for the worker run to finish. Does not throw exceptions from the
future, even if the worker
* ended exceptionally.
*/
- public synchronized void awaitStop()
+ public void awaitStop()
{
- if (workerRunFuture == null) {
+ final ListenableFuture<?> future;
+ synchronized (this) {
+ future = workerRunFuture;
+ }
+
+ if (future == null) {
throw DruidException.defensive("Not running");
}
try {
- workerRunFuture.get();
+ future.get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java
index 59ae71f18d8..6f12c6bed19 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerRunRefTest.java
@@ -112,7 +112,7 @@ public class WorkerRunRefTest
workerFinished.await();
Assert.assertTrue("Future should be done", future.isDone());
- Assert.assertTrue("Future should be canceled", future.isCancelled());
+ Assert.assertFalse("Future should not be canceled", future.isCancelled());
Assert.assertTrue("Worker should have been interrupted",
wasInterrupted.get());
// awaitStop should return immediately since the worker is done
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]