This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 04137070df7 perf: Interrupt controllers when canceling them. (#19233)
04137070df7 is described below

commit 04137070df7b2a157ff7e49abff7a4c4aa3c58ea
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Apr 2 21:42:59 2026 -0700

    perf: Interrupt controllers when canceling them. (#19233)
    
    In PRs #18095 and #18931, worker cancellation was switched to use
    interrupts with a lightweight non-interrupt-based failsafe. This patch
    implements a similar idea for controllers, to aid in more prompt
    cancellation in cases where the controller is blocking on something.
    
    The main change is to track the controller thread in ControllerHolder
    and interrupt it on cancel(), in addition to calling controller.stop().
    ControllerHolder is also moved from dart.controller to msq.exec, since
    it is now a shared class, no longer Dart-specific. In addition, the
    "workerOffline" logic is moved to Dart's ControllerMessageListener,
    since that really is Dart-specific.
---
 .../msq/dart/controller/ControllerHolder.java      | 304 -------------
 .../dart/controller/ControllerMessageListener.java |  23 +-
 .../msq/dart/controller/ControllerThreadPool.java  |  25 +-
 .../dart/controller/DartControllerRegistry.java    |   6 +-
 .../msq/dart/controller/http/DartQueryInfo.java    |   2 +-
 .../msq/dart/controller/sql/DartQueryMaker.java    |  28 +-
 .../msq/dart/controller/sql/DartSqlEngine.java     |   2 +-
 .../druid/msq/dart/guice/DartControllerModule.java |   4 +-
 .../apache/druid/msq/exec/ControllerHolder.java    | 428 +++++++++++++++++++
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  76 ++--
 .../apache/druid/msq/exec/ControllerRegistry.java  |  40 ++
 .../druid/msq/indexing/MSQControllerTask.java      |  43 +-
 .../controller/DartControllerRegistryTest.java     |   1 +
 .../dart/controller/http/DartSqlResourceTest.java  |  14 +-
 .../dart/controller/sql/DartSqlClientImplTest.java |   2 +-
 .../druid/msq/exec/ControllerHolderTest.java       | 470 +++++++++++++++++++++
 .../msq/test/MSQTestOverlordServiceClient.java     |  22 +-
 17 files changed, 1102 insertions(+), 388 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
deleted file mode 100644
index b6595a89ba4..00000000000
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.dart.controller;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.report.TaskReport;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.msq.dart.worker.WorkerId;
-import org.apache.druid.msq.exec.CaptureReportQueryListener;
-import org.apache.druid.msq.exec.Controller;
-import org.apache.druid.msq.exec.ControllerContext;
-import org.apache.druid.msq.exec.QueryListener;
-import org.apache.druid.msq.indexing.error.CanceledFault;
-import org.apache.druid.msq.indexing.error.CancellationReason;
-import org.apache.druid.msq.indexing.error.MSQErrorReport;
-import org.apache.druid.msq.indexing.error.WorkerFailedFault;
-import org.apache.druid.msq.indexing.report.MSQStatusReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
-import org.apache.druid.query.BaseQuery;
-import org.apache.druid.server.security.AuthenticationResult;
-import org.apache.druid.sql.http.StandardQueryState;
-import org.joda.time.DateTime;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Holder for {@link Controller}, stored in {@link DartControllerRegistry}.
- */
-public class ControllerHolder
-{
-  private static final Logger log = new Logger(ControllerHolder.class);
-
-  private final Controller controller;
-  private final String sqlQueryId;
-  private final String sql;
-  private final AuthenticationResult authenticationResult;
-  private final DateTime startTime;
-  private final AtomicReference<State> state = new 
AtomicReference<>(State.ACCEPTED);
-
-  public ControllerHolder(
-      final Controller controller,
-      final String sqlQueryId,
-      final String sql,
-      final AuthenticationResult authenticationResult,
-      final DateTime startTime
-  )
-  {
-    this.controller = Preconditions.checkNotNull(controller, "controller");
-    this.sqlQueryId = Preconditions.checkNotNull(sqlQueryId, "sqlQueryId");
-    this.sql = sql;
-    this.authenticationResult = authenticationResult;
-    this.startTime = Preconditions.checkNotNull(startTime, "startTime");
-  }
-
-  public Controller getController()
-  {
-    return controller;
-  }
-
-  public String getSqlQueryId()
-  {
-    return sqlQueryId;
-  }
-
-  public String getSql()
-  {
-    return sql;
-  }
-
-  public String getControllerHost()
-  {
-    return getControllerContext().selfNode().getHostAndPortToUse();
-  }
-
-  private ControllerContext getControllerContext()
-  {
-    return controller.getControllerContext();
-  }
-
-  public AuthenticationResult getAuthenticationResult()
-  {
-    return authenticationResult;
-  }
-
-  public DateTime getStartTime()
-  {
-    return startTime;
-  }
-
-  public State getState()
-  {
-    return state.get();
-  }
-
-  /**
-   * Call when a worker has gone offline. Closes its client and sends a {@link 
Controller#workerError}
-   * to the controller.
-   */
-  public void workerOffline(final WorkerId workerId)
-  {
-    final String workerIdString = workerId.toString();
-
-    ControllerContext controllerContext = getControllerContext();
-    if (controllerContext instanceof DartControllerContext) {
-      DartControllerContext dartControllerContext = (DartControllerContext) 
controllerContext;
-      // For DartControllerContext, newWorkerClient() returns the same 
instance every time.
-      // This will always be DartControllerContext in production; the 
instanceof check is here because certain
-      // tests use a different context class.
-      
dartControllerContext.newWorkerClient().closeClient(workerId.getHostAndPort());
-    }
-
-    if (controller.hasWorker(workerIdString)) {
-      controller.workerError(
-          MSQErrorReport.fromFault(
-              workerIdString,
-              workerId.getHostAndPort(),
-              null,
-              new WorkerFailedFault(workerIdString, "Worker went offline")
-          )
-      );
-    }
-  }
-
-  /**
-   * Places this holder into {@link State#CANCELED}. Calls {@link 
Controller#stop(CancellationReason)} if it was
-   * previously in state {@link State#RUNNING}.
-   */
-  public void cancel(CancellationReason reason)
-  {
-    if (state.compareAndSet(State.ACCEPTED, State.CANCELED)) {
-      // No need to call stop() since run() wasn't called.
-      return;
-    }
-
-    if (state.compareAndSet(State.RUNNING, State.CANCELED)) {
-      controller.stop(reason);
-    }
-  }
-
-  /**
-   * Runs {@link Controller#run(QueryListener)} in the provided executor. 
Registers the controller with the provided
-   * registry while it is running.
-   *
-   * @return future that resolves when the controller is done or canceled.
-   */
-  public ListenableFuture<?> runAsync(
-      final QueryListener listener,
-      final DartControllerRegistry controllerRegistry,
-      final ControllerThreadPool threadPool
-  )
-  {
-    // Register controller before submitting anything to controllerExeuctor, 
so it shows up in
-    // "active controllers" lists.
-    controllerRegistry.register(this);
-
-    final ListenableFuture<?> future = 
threadPool.getExecutorService().submit(() -> {
-      final String threadName = Thread.currentThread().getName();
-      Thread.currentThread().setName(makeThreadName());
-
-      try {
-        final CaptureReportQueryListener reportListener = new 
CaptureReportQueryListener(listener);
-
-        try {
-          if (state.compareAndSet(State.ACCEPTED, State.RUNNING)) {
-            controller.run(reportListener);
-            updateStateOnQueryComplete(reportListener.getReport());
-          } else {
-            // Canceled before running.
-            reportListener.onQueryComplete(makeCanceledReport());
-          }
-        }
-        catch (Throwable e) {
-          log.warn(
-              e,
-              "Controller[%s] failed, queryId[%s], sqlQueryId[%s]",
-              controller.queryId(),
-              controller.getQueryContext().getString(BaseQuery.QUERY_ID),
-              sqlQueryId
-          );
-        }
-        finally {
-          // Build report and then call "deregister".
-          final MSQTaskReport taskReport;
-
-          if (reportListener.hasReport()) {
-            taskReport = new MSQTaskReport(controller.queryId(), 
reportListener.getReport());
-          } else {
-            taskReport = null;
-          }
-
-          final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
-          reportMap.put(MSQTaskReport.REPORT_KEY, taskReport);
-          controllerRegistry.deregister(this, reportMap);
-        }
-      }
-      finally {
-        Thread.currentThread().setName(threadName);
-      }
-    });
-
-    // Must not cancel the above future, otherwise "deregister" may never get 
called. If a controller is canceled
-    // before it runs, the runnable above stays in the queue until it gets a 
thread, then it exits without running
-    // the controller.
-    return Futures.nonCancellationPropagating(future);
-  }
-
-  private void updateStateOnQueryComplete(final MSQTaskReportPayload report)
-  {
-    switch (report.getStatus().getStatus()) {
-      case SUCCESS:
-        state.compareAndSet(State.RUNNING, State.SUCCESS);
-        break;
-
-      case FAILED:
-        state.compareAndSet(State.RUNNING, State.FAILED);
-        break;
-    }
-  }
-
-  /**
-   * Generate a name for the thread that {@link #runAsync} uses.
-   */
-  private String makeThreadName()
-  {
-    return StringUtils.format(
-        "%s[%s]-sqlQueryId[%s]",
-        Thread.currentThread().getName(),
-        controller.queryId(),
-        sqlQueryId
-    );
-  }
-
-  private MSQTaskReportPayload makeCanceledReport()
-  {
-    final MSQErrorReport errorReport =
-        MSQErrorReport.fromFault(controller.queryId(), null, null, 
CanceledFault.userRequest());
-    final MSQStatusReport statusReport =
-        new MSQStatusReport(TaskState.FAILED, errorReport, null, null, 0, 
Map.of(), 0, 0, null, null);
-    return new MSQTaskReportPayload(statusReport, null, null, null);
-  }
-
-  public enum State
-  {
-    /**
-     * Query has been accepted, but not yet {@link 
Controller#run(QueryListener)}.
-     */
-    ACCEPTED(StandardQueryState.ACCEPTED),
-
-    /**
-     * Query has had {@link Controller#run(QueryListener)} called.
-     */
-    RUNNING(StandardQueryState.RUNNING),
-
-    /**
-     * Query has been canceled.
-     */
-    CANCELED(StandardQueryState.CANCELED),
-
-    /**
-     * Query has exited successfully.
-     */
-    SUCCESS(StandardQueryState.SUCCESS),
-
-    /**
-     * Query has failed.
-     */
-    FAILED(StandardQueryState.FAILED);
-
-    private final String statusString;
-
-    State(String statusString)
-    {
-      this.statusString = statusString;
-    }
-
-    public String getStatusString()
-    {
-      return statusString;
-    }
-  }
-}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
index e9d6fbb3010..0fe85f42c75 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
@@ -24,7 +24,10 @@ import org.apache.druid.messages.client.MessageListener;
 import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
 import org.apache.druid.msq.dart.worker.WorkerId;
 import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.ControllerHolder;
 import org.apache.druid.msq.indexing.error.MSQErrorReport;
+import org.apache.druid.msq.indexing.error.WorkerFailedFault;
 import org.apache.druid.server.DruidNode;
 
 /**
@@ -62,7 +65,25 @@ public class ControllerMessageListener implements 
MessageListener<ControllerMess
     for (final ControllerHolder holder : 
controllerRegistry.getAllControllers()) {
       final Controller controller = holder.getController();
       final WorkerId workerId = WorkerId.fromDruidNode(node, 
controller.queryId());
-      holder.workerOffline(workerId);
+      final String workerIdString = workerId.toString();
+
+      // Close the worker client for this server.
+      final ControllerContext controllerContext = 
controller.getControllerContext();
+      if (controllerContext instanceof DartControllerContext) {
+        ((DartControllerContext) 
controllerContext).newWorkerClient().closeClient(workerId.getHostAndPort());
+      }
+
+      // Notify the controller that the worker has gone offline.
+      if (controller.hasWorker(workerIdString)) {
+        controller.workerError(
+            MSQErrorReport.fromFault(
+                workerIdString,
+                workerId.getHostAndPort(),
+                null,
+                new WorkerFailedFault(workerIdString, "Worker went offline")
+            )
+        );
+      }
     }
   }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
index 8e369d6fa81..352b0a86678 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
@@ -25,6 +25,8 @@ import 
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.msq.dart.guice.DartControllerConfig;
 import org.apache.druid.msq.exec.Controller;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 /**
  * Thread pool for running {@link Controller}. Number of threads is equal to
  * {@link DartControllerConfig#getConcurrentQueries()}, which limits the 
number of concurrent controllers.
@@ -32,21 +34,32 @@ import org.apache.druid.msq.exec.Controller;
 @ManageLifecycle
 public class ControllerThreadPool
 {
-  private final ListeningExecutorService executorService;
+  private final ListeningExecutorService runExec;
+  private final ScheduledExecutorService timeoutExec;
+
+  public ControllerThreadPool(
+      final ListeningExecutorService runExec,
+      final ScheduledExecutorService timeoutExec
+  )
+  {
+    this.runExec = runExec;
+    this.timeoutExec = timeoutExec;
+  }
 
-  public ControllerThreadPool(final ListeningExecutorService executorService)
+  public ListeningExecutorService getRunExecutorService()
   {
-    this.executorService = executorService;
+    return runExec;
   }
 
-  public ListeningExecutorService getExecutorService()
+  public ScheduledExecutorService getTimeoutExecutorService()
   {
-    return executorService;
+    return timeoutExec;
   }
 
   @LifecycleStop
   public void stop()
   {
-    executorService.shutdown();
+    runExec.shutdown();
+    timeoutExec.shutdown();
   }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
index f1d1e4e1b76..09176bcfe77 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
@@ -32,6 +32,8 @@ import 
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
 import org.apache.druid.msq.dart.guice.DartControllerConfig;
 import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.exec.ControllerHolder;
+import org.apache.druid.msq.exec.ControllerRegistry;
 import org.apache.druid.msq.indexing.report.MSQStatusReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
@@ -54,7 +56,7 @@ import java.util.concurrent.TimeUnit;
  * Registry for actively-running {@link Controller} and recently-completed 
{@link TaskReport}.
  */
 @ManageLifecycle
-public class DartControllerRegistry
+public class DartControllerRegistry implements ControllerRegistry
 {
   /**
    * Minimum frequency for checking if {@link #cleanupExpiredReports()} needs 
to be run.
@@ -127,6 +129,7 @@ public class DartControllerRegistry
    * Add a controller. Throws {@link DruidException} if a controller with the 
same {@link Controller#queryId()} is
    * already registered.
    */
+  @Override
   public void register(ControllerHolder holder)
   {
     final String dartQueryId = holder.getController().queryId();
@@ -141,6 +144,7 @@ public class DartControllerRegistry
    * time afterwards, based on {@link 
DartControllerConfig#getMaxRetainedReportCount()} and
    * {@link DartControllerConfig#getMaxRetainedReportDuration()}.
    */
+  @Override
   public void deregister(ControllerHolder holder, @Nullable 
TaskReport.ReportMap completeReport)
   {
     final String dartQueryId = holder.getController().queryId();
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
index 9902b86239f..ef36d5146ba 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
@@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
 import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
+import org.apache.druid.msq.exec.ControllerHolder;
 import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.server.DruidNode;
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index 053e5fef592..d6f62f859f7 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.msq.dart.controller.sql;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.indexer.report.TaskReport;
@@ -30,13 +29,13 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.SequenceWrapper;
 import org.apache.druid.java.util.common.guava.Sequences;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
 import org.apache.druid.msq.dart.controller.ControllerThreadPool;
 import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
-import org.apache.druid.msq.dart.controller.DartControllerRegistry;
 import org.apache.druid.msq.dart.guice.DartControllerConfig;
 import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.ControllerHolder;
 import org.apache.druid.msq.exec.ControllerImpl;
+import org.apache.druid.msq.exec.ControllerRegistry;
 import org.apache.druid.msq.exec.QueryKitSpecFactory;
 import org.apache.druid.msq.exec.ResultsContext;
 import org.apache.druid.msq.exec.SequenceQueryListener;
@@ -83,7 +82,7 @@ public class DartQueryMaker implements QueryMaker
   /**
    * Controller registry, used to register and remove controllers as they 
start and finish.
    */
-  private final DartControllerRegistry controllerRegistry;
+  private final ControllerRegistry controllerRegistry;
 
   /**
    * Controller config.
@@ -102,7 +101,7 @@ public class DartQueryMaker implements QueryMaker
       List<Entry<Integer, String>> fieldMapping,
       DartControllerContextFactory controllerContextFactory,
       PlannerContext plannerContext,
-      DartControllerRegistry controllerRegistry,
+      ControllerRegistry controllerRegistry,
       DartControllerConfig controllerConfig,
       ControllerThreadPool controllerThreadPool,
       QueryKitSpecFactory queryKitSpecFactory,
@@ -238,7 +237,7 @@ public class DartQueryMaker implements QueryMaker
    * Run a query and return the full report, buffered in memory up to
    * {@link DartControllerConfig#getMaxQueryReportSize()}.
    *
-   * Arranges for {@link DartControllerRegistry#deregister} to be called upon 
completion (either success or failure).
+   * Arranges for {@link ControllerRegistry#deregister} to be called upon 
completion (either success or failure).
    */
   private Sequence<Object[]> runWithReport(
       final ControllerHolder controllerHolder,
@@ -268,7 +267,11 @@ public class DartQueryMaker implements QueryMaker
 
     try {
       // Submit controller and wait for it to finish.
-      controllerHolder.runAsync(listener, controllerRegistry, 
controllerThreadPool).get();
+      controllerHolder.runAsync(
+          listener,
+          controllerRegistry,
+          controllerThreadPool
+      ).get();
 
       // Return a sequence with just one row (the report).
       final TaskReport.ReportMap reportMap =
@@ -288,7 +291,7 @@ public class DartQueryMaker implements QueryMaker
   /**
    * Run a query and return the results only, streamed back using {@link 
SequenceQueryListener}.
    *
-   * Arranges for {@link DartControllerRegistry#deregister} to be called upon 
completion (either success or failure).
+   * Arranges for {@link ControllerRegistry#deregister} to be called upon 
completion (either success or failure).
    */
   private Sequence<Object[]> runWithSequence(
       final ControllerHolder controllerHolder,
@@ -297,8 +300,11 @@ public class DartQueryMaker implements QueryMaker
   )
   {
     final SequenceQueryListener listener = new SequenceQueryListener();
-    final ListenableFuture<?> runFuture =
-        controllerHolder.runAsync(listener, controllerRegistry, 
controllerThreadPool);
+    controllerHolder.runAsync(
+        listener,
+        controllerRegistry,
+        controllerThreadPool
+    );
 
     return Sequences.wrap(
         listener.getSequence().flatMap(
@@ -316,7 +322,7 @@ public class DartQueryMaker implements QueryMaker
           public void after(final boolean isDone, final Throwable thrown)
           {
             if (!isDone || thrown != null) {
-              runFuture.cancel(true); // Cancel on early stop or failure
+              controllerHolder.cancel(CancellationReason.UNKNOWN);
             }
           }
         }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
index 667a34a29f4..a66a72ec715 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
@@ -34,7 +34,6 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.dart.Dart;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
 import org.apache.druid.msq.dart.controller.ControllerThreadPool;
 import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
 import org.apache.druid.msq.dart.controller.DartControllerRegistry;
@@ -42,6 +41,7 @@ import 
org.apache.druid.msq.dart.controller.QueryInfoAndReport;
 import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
 import org.apache.druid.msq.dart.guice.DartControllerConfig;
 import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.exec.ControllerHolder;
 import org.apache.druid.msq.exec.QueryKitSpecFactory;
 import org.apache.druid.msq.indexing.error.CancellationReason;
 import org.apache.druid.msq.querykit.MultiQueryKit;
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
index 3b86b5e8b70..8a9b1cf3607 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
@@ -37,6 +37,7 @@ import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.LoadScope;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.msq.dart.Dart;
 import org.apache.druid.msq.dart.DartResourcePermissionMapper;
 import org.apache.druid.msq.dart.controller.ControllerMessageListener;
@@ -141,7 +142,8 @@ public class DartControllerModule implements DruidModule
                   dartControllerConfig.getConcurrentQueries(),
                   "dart-controller-%s"
               )
-          )
+          ),
+          ScheduledExecutors.fixed(1, "dart-controller-timeout-%s")
       );
     }
   }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerHolder.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerHolder.java
new file mode 100644
index 00000000000..4caf79d2866
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerHolder.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
+import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.CancellationReason;
+import org.apache.druid.msq.indexing.error.MSQErrorReport;
+import org.apache.druid.msq.indexing.report.MSQStatusReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.sql.http.StandardQueryState;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Holder for a {@link Controller} that manages its lifecycle, including 
cancellation and timeouts.
+ */
+public class ControllerHolder
+{
+  private static final Logger log = new Logger(ControllerHolder.class);
+
+  private final Controller controller;
+  private final String sqlQueryId;
+
+  @Nullable
+  private final String sql;
+
+  @Nullable
+  private final AuthenticationResult authenticationResult;
+
+  private final DateTime startTime;
+
+  @GuardedBy("this")
+  private State state = State.ACCEPTED;
+
+  /**
+   * Thread running the controller. Set inside the {@link #runAsync} runnable, 
cleared in its finally block.
+   */
+  @GuardedBy("this")
+  private Thread controllerThread;
+
+  /**
+   * If cancel was called, the reason for cancellation. Used to deliver the 
cancellation to the controller thread
+   * if {@link #cancel} is called before the controller starts running.
+   */
+  @GuardedBy("this")
+  private CancellationReason cancelReason;
+
+  public ControllerHolder(
+      final Controller controller,
+      final String sqlQueryId,
+      @Nullable final String sql,
+      @Nullable final AuthenticationResult authenticationResult,
+      final DateTime startTime
+  )
+  {
+    this.controller = Preconditions.checkNotNull(controller, "controller");
+    this.sqlQueryId = Preconditions.checkNotNull(sqlQueryId, "sqlQueryId");
+    this.sql = sql;
+    this.authenticationResult = authenticationResult;
+    this.startTime = Preconditions.checkNotNull(startTime, "startTime");
+  }
+
+  public Controller getController()
+  {
+    return controller;
+  }
+
+  @Nullable
+  public String getSqlQueryId()
+  {
+    return sqlQueryId;
+  }
+
+  @Nullable
+  public String getSql()
+  {
+    return sql;
+  }
+
+  public String getControllerHost()
+  {
+    return getControllerContext().selfNode().getHostAndPortToUse();
+  }
+
+  private ControllerContext getControllerContext()
+  {
+    return controller.getControllerContext();
+  }
+
+  @Nullable
+  public AuthenticationResult getAuthenticationResult()
+  {
+    return authenticationResult;
+  }
+
+  public DateTime getStartTime()
+  {
+    return startTime;
+  }
+
+  public synchronized State getState()
+  {
+    return state;
+  }
+
+  /**
+   * Runs {@link Controller#run(QueryListener)} in {@link 
ControllerThreadPool#getRunExecutorService()}. Optionally
+   * registers the controller with the provided registry while it is running. 
Schedules a timeout on the provided
+   * {@link ControllerThreadPool#getTimeoutExecutorService()} .
+   *
+   * @return future that resolves when the controller is done or canceled
+   */
+  public ListenableFuture<?> runAsync(
+      final QueryListener listener,
+      @Nullable final ControllerRegistry controllerRegistry,
+      final ControllerThreadPool controllerThreadPool
+  )
+  {
+    if (controllerRegistry != null) {
+      // Register controller before submitting anything to the executor, so it 
shows up in
+      // "active controllers" lists.
+      controllerRegistry.register(this);
+    }
+
+    // Schedule timeout based on the query deadline. The scheduled task calls 
cancel(), which is
+    // safe even if the controller has already finished (cancel is a no-op for 
terminal states).
+    final ScheduledFuture<?> timeoutFuture = 
scheduleTimeout(controllerThreadPool.getTimeoutExecutorService());
+
+    final ListenableFuture<?> runFuture = 
controllerThreadPool.getRunExecutorService().submit(() -> {
+      final String threadName = Thread.currentThread().getName();
+      Thread.currentThread().setName(makeThreadName());
+
+      try {
+        final CaptureReportQueryListener reportListener = new 
CaptureReportQueryListener(listener);
+
+        try {
+          if (transitionToRunning()) {
+            try {
+              controller.run(reportListener);
+            }
+            finally {
+              synchronized (this) {
+                controllerThread = null;
+                // Clear any interrupt delivered during or after 
controller.run().
+                //noinspection ResultOfMethodCallIgnored
+                Thread.interrupted();
+              }
+            }
+
+            updateStateOnQueryComplete(reportListener.getReport());
+          } else {
+            // Canceled before running.
+            synchronized (this) {
+              reportListener.onQueryComplete(makeCanceledReport(cancelReason));
+            }
+          }
+        }
+        catch (Throwable e) {
+          log.warn(
+              e,
+              "Controller[%s] failed, queryId[%s], sqlQueryId[%s]",
+              controller.queryId(),
+              controller.getQueryContext().getString(BaseQuery.QUERY_ID),
+              sqlQueryId
+          );
+        }
+        finally {
+          // Build report and then call "deregister".
+          final MSQTaskReport taskReport;
+
+          if (reportListener.hasReport()) {
+            taskReport = new MSQTaskReport(controller.queryId(), 
reportListener.getReport());
+          } else {
+            taskReport = null;
+          }
+
+          final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+          reportMap.put(MSQTaskReport.REPORT_KEY, taskReport);
+
+          if (controllerRegistry != null) {
+            controllerRegistry.deregister(this, reportMap);
+          }
+        }
+      }
+      finally {
+        Thread.currentThread().setName(threadName);
+
+        if (timeoutFuture != null) {
+          timeoutFuture.cancel(false);
+        }
+      }
+    });
+
+    // Must not cancel the above future, otherwise "deregister" may never get 
called. If a controller is canceled
+    // before it runs, the runnable above stays in the queue until it gets a 
thread, then it exits without running
+    // the controller.
+    return Futures.nonCancellationPropagating(runFuture);
+  }
+
+  /**
+   * Places this holder into {@link State#CANCELED} and stops the controller.
+   */
+  public void cancel(final CancellationReason reason)
+  {
+    final State prevState;
+    synchronized (this) {
+      prevState = state;
+
+      if (state == State.ACCEPTED || state == State.RUNNING) {
+        state = State.CANCELED;
+        cancelReason = reason;
+      }
+    }
+
+    if (prevState == State.RUNNING) {
+      controller.stop(reason);
+
+      // Interrupt the controller thread as a failsafe, in case the controller 
is blocked on something.
+      synchronized (this) {
+        if (controllerThread != null) {
+          controllerThread.interrupt();
+        }
+      }
+    }
+  }
+
+  /**
+   * Attempts to transition from {@link State#ACCEPTED} to {@link 
State#RUNNING} and capture the controller thread.
+   * If a cancellation arrived between the state transition and capturing the 
thread, generates a canceled report
+   * on the provided listener and returns false.
+   *
+   * @return true if the controller should proceed to run, false if it was 
canceled
+   */
+  private synchronized boolean transitionToRunning()
+  {
+    if (state != State.ACCEPTED) {
+      return false;
+    }
+
+    state = State.RUNNING;
+    controllerThread = Thread.currentThread();
+    return true;
+  }
+
+  /**
+   * Schedules a timeout task that cancels the controller when the query 
deadline elapses. If the deadline has
+   * already passed, cancels immediately without scheduling. Returns null if 
no timeout is configured or if
+   * an immediate cancellation was performed.
+   */
+  @Nullable
+  private ScheduledFuture<?> scheduleTimeout(final ScheduledExecutorService 
scheduledExec)
+  {
+    final DateTime deadline = getQueryDeadline();
+
+    if (deadline == null) {
+      return null;
+    }
+
+    final long delayMs = deadline.getMillis() - DateTimes.nowUtc().getMillis();
+
+    if (delayMs <= 0) {
+      // Deadline has already passed. Cancel immediately rather than 
scheduling, so the cancellation
+      // takes effect even when using a direct executor for the controller 
thread.
+      cancel(CancellationReason.QUERY_TIMEOUT);
+      return null;
+    }
+
+    return scheduledExec.schedule(
+        () -> cancel(CancellationReason.QUERY_TIMEOUT),
+        delayMs,
+        TimeUnit.MILLISECONDS
+    );
+  }
+
+  /**
+   * Retrieves the query deadline from the controller's query context. Returns 
null if no timeout is configured.
+   */
+  @Nullable
+  private DateTime getQueryDeadline()
+  {
+    final QueryContext queryContext = controller.getQueryContext();
+    DateTime deadline = MultiStageQueryContext.getQueryDeadline(queryContext);
+
+    if (deadline == null) {
+      // Newer Brokers set the deadline, but older ones might not. Fall back 
to startTime and timeout in this case.
+      final long timeout = queryContext.getTimeout(QueryContexts.NO_TIMEOUT);
+      if (timeout != QueryContexts.NO_TIMEOUT) {
+        deadline = 
MultiStageQueryContext.getStartTime(queryContext).plus(timeout);
+      }
+    }
+
+    return deadline;
+  }
+
+  /**
+   * If {@link #state} is {@link State#RUNNING}, update it based on the 
outcome of a query.
+   * Otherwise do nothing.
+   */
+  private synchronized void updateStateOnQueryComplete(final 
MSQTaskReportPayload report)
+  {
+    if (state != State.RUNNING) {
+      return;
+    }
+
+    switch (report.getStatus().getStatus()) {
+      case SUCCESS:
+        state = State.SUCCESS;
+        break;
+
+      case FAILED:
+        state = State.FAILED;
+        break;
+    }
+  }
+
+  /**
+   * Generate a name for the thread that {@link #runAsync} uses.
+   */
+  private String makeThreadName()
+  {
+    if (sqlQueryId != null) {
+      return StringUtils.format(
+          "%s[%s]-sqlQueryId[%s]",
+          Thread.currentThread().getName(),
+          controller.queryId(),
+          sqlQueryId
+      );
+    } else {
+      return StringUtils.format(
+          "%s[%s]",
+          Thread.currentThread().getName(),
+          controller.queryId()
+      );
+    }
+  }
+
+  private MSQTaskReportPayload makeCanceledReport(@Nullable final 
CancellationReason reason)
+  {
+    final MSQErrorReport errorReport =
+        MSQErrorReport.fromFault(
+            controller.queryId(),
+            null,
+            null,
+            new CanceledFault(reason != null ? reason : 
CancellationReason.UNKNOWN)
+        );
+    final MSQStatusReport statusReport =
+        new MSQStatusReport(TaskState.FAILED, errorReport, null, null, 0, 
Map.of(), 0, 0, null, null);
+    return new MSQTaskReportPayload(statusReport, null, null, null);
+  }
+
+  public enum State
+  {
+    /**
+     * Query has been accepted, but not yet {@link 
Controller#run(QueryListener)}.
+     */
+    ACCEPTED(StandardQueryState.ACCEPTED),
+
+    /**
+     * Query has had {@link Controller#run(QueryListener)} called.
+     */
+    RUNNING(StandardQueryState.RUNNING),
+
+    /**
+     * Query has been canceled.
+     */
+    CANCELED(StandardQueryState.CANCELED),
+
+    /**
+     * Query has exited successfully.
+     */
+    SUCCESS(StandardQueryState.SUCCESS),
+
+    /**
+     * Query has failed.
+     */
+    FAILED(StandardQueryState.FAILED);
+
+    private final String statusString;
+
+    State(final String statusString)
+    {
+      this.statusString = statusString;
+    }
+
+    public String getStatusString()
+    {
+      return statusString;
+    }
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 082ab512d42..e02d2909547 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -172,7 +172,6 @@ import org.apache.druid.query.DefaultQueryMetrics;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContext;
-import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
@@ -219,7 +218,6 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -252,6 +250,13 @@ public class ControllerImpl implements Controller
   // For system error reporting. This is the very first error we got from a 
worker. (We only report that one.)
   private final AtomicReference<MSQErrorReport> workerErrorRef = new 
AtomicReference<>();
 
+  /**
+   * Set by {@link #stop(CancellationReason)}. If non-null, this reason takes 
priority over any exception
+   * encountered during execution when building the error report. If we didn't 
do this, interrupts arising
+   * from cancellation could produce errors that are less informative than the 
actual cancellation reason.
+   */
+  private volatile CancellationReason cancelReason;
+
   // For system warning reporting
   private final ConcurrentLinkedQueue<MSQErrorReport> workerWarnings = new 
ConcurrentLinkedQueue<>();
 
@@ -376,7 +381,9 @@ public class ControllerImpl implements Controller
     // stopGracefully() is called when the containing process is terminated, 
or when the task is canceled.
     log.info("Query [%s] canceled.", queryDef != null ? queryDef.getQueryId() 
: "<no id yet>");
 
+    cancelReason = reason;
     stopExternalFetchers();
+    kernelManipulationQueue.clear(); // No point processing any 
possibly-queued commands.
     addToKernelManipulationQueue(
         kernel -> {
           throw new MSQException(new CanceledFault(reason));
@@ -471,7 +478,12 @@ public class ControllerImpl implements Controller
       MSQErrorReport workerError = workerErrorRef.get();
 
       taskStateForReport = TaskState.FAILED;
-      errorForReport = MSQTasks.makeErrorReport(queryId(), selfHost, 
controllerError, workerError);
+
+      if (cancelReason != null) {
+        errorForReport = MSQErrorReport.fromFault(queryId(), selfHost, null, 
new CanceledFault(cancelReason));
+      } else {
+        errorForReport = MSQTasks.makeErrorReport(queryId(), selfHost, 
controllerError, workerError);
+      }
 
       // Log the errors we encountered.
       if (controllerError != null) {
@@ -667,6 +679,9 @@ public class ControllerImpl implements Controller
    * controller loop in {@link RunQueryUntilDone#run()}.
    * <p>
    * If the consumer throws an exception, the query fails.
+   * <p>
+   * Consumers must not perform blocking operations (network calls, waiting on 
futures, sleeping, etc.), because
+   * the main controller loop executes them in sequence and blocking would 
delay controller operations.
    */
   public void addToKernelManipulationQueue(Consumer<ControllerQueryKernel> 
kernelConsumer)
   {
@@ -2328,10 +2343,6 @@ public class ControllerImpl implements Controller
       startTaskLauncher();
 
       boolean runAgain;
-      final DateTime queryFailDeadline = getQueryDeadline();
-
-      // The timeout could have already elapsed while waiting for the 
controller to start, check it now.
-      checkTimeout(queryFailDeadline);
 
       while (!queryKernel.isDone()) {
         startStages();
@@ -2344,10 +2355,8 @@ public class ControllerImpl implements Controller
         checkForErrorsInSketchFetcher();
 
         if (!runAgain) {
-          runKernelCommands(queryFailDeadline);
+          runKernelCommands();
         }
-
-        checkTimeout(queryFailDeadline);
       }
 
       if (!queryKernel.isSuccess()) {
@@ -2359,34 +2368,6 @@ public class ControllerImpl implements Controller
       return Pair.of(queryKernel, workerTaskLauncherFuture);
     }
 
-    /**
-     * Retrieves the timeout and start time from the query context and reads 
or calculates the deadline.
-     */
-    private DateTime getQueryDeadline()
-    {
-      DateTime deadline = 
MultiStageQueryContext.getQueryDeadline(querySpec.getContext());
-
-      if (deadline == null) {
-        // Newer Brokers set the deadline, but older ones might not. Fall back 
to startTime and timeout in this case.
-        final long timeout = 
querySpec.getContext().getTimeout(QueryContexts.NO_TIMEOUT);
-        if (timeout != QueryContexts.NO_TIMEOUT) {
-          deadline = 
MultiStageQueryContext.getStartTime(querySpec.getContext()).plus(timeout);
-        }
-      }
-
-      return deadline != null ? deadline : DateTimes.MAX;
-    }
-
-    /**
-     * Checks the queryFailDeadline and fails the query with a {@link 
CanceledFault} if it has passed.
-     */
-    private void checkTimeout(DateTime queryFailDeadline)
-    {
-      if (queryFailDeadline.isBeforeNow()) {
-        throw new MSQException(CanceledFault.timeout());
-      }
-    }
-
     private void checkForErrorsInSketchFetcher()
     {
       Throwable throwable = workerSketchFetcher.getError();
@@ -2472,24 +2453,21 @@ public class ControllerImpl implements Controller
 
     /**
      * Run at least one command from {@link #kernelManipulationQueue}, waiting 
for it if necessary.
+     * Timeouts are handled externally by {@link ControllerHolder}, which 
calls {@link Controller#stop}
+     * to enqueue a {@link CanceledFault} and then interrupts this thread when 
the query deadline elapses.
      */
-    private void runKernelCommands(DateTime queryFailDeadline) throws 
InterruptedException
+    private void runKernelCommands() throws InterruptedException
     {
       if (!queryKernel.isDone()) {
-        // Run the next command, waiting till timeout for it if necessary.
-        Consumer<ControllerQueryKernel> command = kernelManipulationQueue.poll(
-            queryFailDeadline.getMillis() - DateTimes.nowUtc().getMillis(),
-            TimeUnit.MILLISECONDS
-        );
-        if (command == null) {
-          return;
-        }
+        // Run the next command, waiting for it if necessary.
+        final Consumer<ControllerQueryKernel> command = 
kernelManipulationQueue.take();
         command.accept(queryKernel);
 
         // Run all pending commands after that one. Helps avoid deep queues.
         // After draining the command queue, move on to the next iteration of 
the controller loop.
-        while ((command = kernelManipulationQueue.poll()) != null) {
-          command.accept(queryKernel);
+        Consumer<ControllerQueryKernel> next;
+        while ((next = kernelManipulationQueue.poll()) != null) {
+          next.accept(queryKernel);
         }
       }
     }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerRegistry.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerRegistry.java
new file mode 100644
index 00000000000..30a1a3379fc
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerRegistry.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.indexer.report.TaskReport;
+
+import javax.annotation.Nullable;
+
+/**
+ * Registry for actively-running {@link Controller} instances, held in {@link 
ControllerHolder}.
+ */
+public interface ControllerRegistry
+{
+  /**
+   * Register a controller holder. Throws if a controller with the same query 
ID is already registered.
+   */
+  void register(ControllerHolder holder);
+
+  /**
+   * Deregister a controller holder. Optionally stores a final report.
+   */
+  void deregister(ControllerHolder holder, @Nullable TaskReport.ReportMap 
report);
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index c43d855ad50..1950aeace5f 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -40,9 +40,14 @@ import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
 import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
 import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.ControllerHolder;
 import org.apache.druid.msq.exec.ControllerImpl;
 import org.apache.druid.msq.exec.MSQTasks;
 import org.apache.druid.msq.exec.ResultsContext;
@@ -72,6 +77,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+
 @JsonTypeName(MSQControllerTask.TYPE)
 public class MSQControllerTask extends AbstractTask implements 
ClientTaskQuery, PendingSegmentAllocatingTask
 {
@@ -113,7 +119,7 @@ public class MSQControllerTask extends AbstractTask 
implements ClientTaskQuery,
   @JacksonInject
   private Injector injector;
 
-  private volatile Controller controller;
+  private volatile ControllerHolder controllerHolder;
 
   @JsonCreator
   public MSQControllerTask(
@@ -262,13 +268,22 @@ public class MSQControllerTask extends AbstractTask 
implements ClientTaskQuery,
   {
     final ControllerContext context = 
injector.getInstance(IndexerControllerContextFactory.class)
                                               .buildWithTask(this, toolbox);
-    controller = new ControllerImpl(
+
+    final ControllerImpl controller = new ControllerImpl(
         querySpec,
         new ResultsContext(getSqlTypeNames(), getSqlResultsContext()),
         context,
         injector.getInstance(MSQTaskQueryKitSpecFactory.class)
     );
 
+    controllerHolder = new ControllerHolder(
+        controller,
+        controller.getQueryContext().getString(QueryContexts.CTX_SQL_QUERY_ID, 
controller.queryId()),
+        getSqlQuery(),
+        null,
+        DateTimes.nowUtc()
+    );
+
     final ResultsContext resultsContext = new 
ResultsContext(getSqlTypeNames(), getSqlResultsContext());
     final TaskReportQueryListener queryListener = new TaskReportQueryListener(
         () -> 
toolbox.getTaskReportFileWriter().openReportOutputStream(getId()),
@@ -280,15 +295,29 @@ public class MSQControllerTask extends AbstractTask 
implements ClientTaskQuery,
         resultsContext
     );
 
-    controller.run(queryListener);
-    return queryListener.getStatusReport().toTaskStatus(getId());
+    final ControllerThreadPool controllerThreadPool = new ControllerThreadPool(
+        Execs.directExecutor(),
+        ScheduledExecutors.fixed(
+            1,
+            "controller-timeout[" + 
StringUtils.encodeForFormat(controller.queryId()) + "]-%s"
+        )
+    );
+
+    try {
+      controllerHolder.runAsync(queryListener, null, 
controllerThreadPool).get();
+      return queryListener.getStatusReport().toTaskStatus(getId());
+    }
+    finally {
+      controllerThreadPool.stop();
+    }
   }
 
   @Override
   public void stopGracefully(final TaskConfig taskConfig)
   {
-    if (controller != null) {
-      controller.stop(CancellationReason.TASK_SHUTDOWN);
+    final ControllerHolder holder = controllerHolder;
+    if (holder != null) {
+      holder.cancel(CancellationReason.TASK_SHUTDOWN);
     }
   }
 
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
index 94d5be39150..74eea1f0d2b 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.msq.dart.guice.DartControllerConfig;
 import org.apache.druid.msq.exec.Controller;
 import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.ControllerHolder;
 import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.server.DruidNode;
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index 97014d91714..d4116fed863 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -36,7 +36,6 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
 import org.apache.druid.msq.dart.controller.ControllerThreadPool;
 import org.apache.druid.msq.dart.controller.DartControllerRegistry;
 import org.apache.druid.msq.dart.controller.sql.DartQueryMaker;
@@ -46,6 +45,7 @@ import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
 import org.apache.druid.msq.dart.guice.DartControllerConfig;
 import org.apache.druid.msq.exec.Controller;
 import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.ControllerHolder;
 import org.apache.druid.msq.indexing.MSQSpec;
 import org.apache.druid.msq.indexing.error.CanceledFault;
 import org.apache.druid.msq.indexing.error.InvalidNullByteFault;
@@ -114,6 +114,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -263,6 +264,11 @@ public class DartSqlResourceTest extends MSQTestBase
                     MAX_CONTROLLERS,
                     StringUtils.encodeForFormat(getClass().getSimpleName() + 
"-controller-exec")
                 )
+            ),
+            Executors.newSingleThreadScheduledExecutor(
+                Execs.makeThreadFactory(
+                    StringUtils.encodeForFormat(getClass().getSimpleName() + 
"-controller-timeout")
+                )
             )
         ),
         new DartQueryKitSpecFactory(new 
TestTimelineServerView(Collections.emptyList())),
@@ -297,9 +303,9 @@ public class DartSqlResourceTest extends MSQTestBase
     mockCloser.close();
 
     // shutdown(), not shutdownNow(), to ensure controllers stop timely on 
their own.
-    controllerThreadPool.getExecutorService().shutdown();
+    controllerThreadPool.getRunExecutorService().shutdown();
 
-    if (!controllerThreadPool.getExecutorService().awaitTermination(1, 
TimeUnit.MINUTES)) {
+    if (!controllerThreadPool.getRunExecutorService().awaitTermination(1, 
TimeUnit.MINUTES)) {
       throw new IAE("controllerExecutor.awaitTermination() timed out");
     }
 
@@ -753,7 +759,7 @@ public class DartSqlResourceTest extends MSQTestBase
            .thenReturn(makeAuthenticationResult(REGULAR_USER_NAME));
 
     // Block up the controllerExecutor so the controller runs long enough to 
cancel it.
-    final Future<?> sleepFuture = 
controllerThreadPool.getExecutorService().submit(() -> {
+    final Future<?> sleepFuture = 
controllerThreadPool.getRunExecutorService().submit(() -> {
       try {
         Thread.sleep(3_600_000);
       }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
index 5fbda3623c4..107d48c371a 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
@@ -25,9 +25,9 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
 import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
 import org.apache.druid.msq.dart.guice.DartWorkerModule;
+import org.apache.druid.msq.exec.ControllerHolder;
 import org.apache.druid.rpc.MockServiceClient;
 import org.apache.druid.rpc.RequestBuilder;
 import org.apache.druid.sql.http.GetQueriesResponse;
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerHolderTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerHolderTest.java
new file mode 100644
index 00000000000..cea10aa8827
--- /dev/null
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerHolderTest.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
+import org.apache.druid.msq.indexing.error.CancellationReason;
+import org.apache.druid.msq.indexing.error.MSQErrorReport;
+import org.apache.druid.msq.indexing.report.MSQStatusReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.test.NoopQueryListener;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryContexts;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ControllerHolderTest
+{
+  private static final Logger log = new Logger(ControllerHolderTest.class);
+  private ControllerThreadPool controllerThreadPool;
+
+  @Before
+  public void setUp()
+  {
+    controllerThreadPool = new ControllerThreadPool(
+        MoreExecutors.listeningDecorator(Execs.multiThreaded(2, 
"controller-holder-test-%s")),
+        Executors.newSingleThreadScheduledExecutor(
+            Execs.makeThreadFactory("controller-holder-test-timeout-%s")
+        )
+    );
+  }
+
+  @After
+  public void tearDown() throws InterruptedException
+  {
+    final ListeningExecutorService exec = 
controllerThreadPool.getRunExecutorService();
+    final ScheduledExecutorService scheduledExec = 
controllerThreadPool.getTimeoutExecutorService();
+    controllerThreadPool.stop();
+    if (!exec.awaitTermination(5, TimeUnit.MINUTES)) {
+      log.warn("Could not terminate run executor within 5 minutes");
+    }
+    if (!scheduledExec.awaitTermination(5, TimeUnit.MINUTES)) {
+      log.warn("Could not terminate timeout executor within 5 minutes");
+    }
+  }
+
+  @Test
+  public void testInterruptionOfLongRunningController() throws Exception
+  {
+    final CountDownLatch controllerStarted = new CountDownLatch(1);
+    final CountDownLatch controllerFinished = new CountDownLatch(1);
+    final AtomicBoolean wasInterrupted = new AtomicBoolean(false);
+    final Controller controller = new TestController("test-query")
+    {
+      @Override
+      public void run(final QueryListener listener)
+      {
+        try {
+          controllerStarted.countDown();
+          Thread.sleep(300_000);
+        }
+        catch (InterruptedException e) {
+          wasInterrupted.set(true);
+        }
+        finally {
+          listener.onQueryComplete(makeSuccessReport());
+          controllerFinished.countDown();
+        }
+      }
+    };
+
+    final ControllerHolder holder = new ControllerHolder(controller, "sql-1", 
null, null, DateTimes.nowUtc());
+    final ListenableFuture<?> future = holder.runAsync(new 
NoopQueryListener(), null, controllerThreadPool);
+
+    controllerStarted.await();
+    holder.cancel(CancellationReason.USER_REQUEST);
+    controllerFinished.await();
+
+    try {
+      future.get(5, TimeUnit.SECONDS);
+    }
+    catch (Exception ignored) {
+      // ignore
+    }
+
+    Assert.assertTrue("Controller should have been interrupted", 
wasInterrupted.get());
+    Assert.assertEquals(ControllerHolder.State.CANCELED, holder.getState());
+  }
+
+  @Test
+  public void testCancelCallsStop() throws Exception
+  {
+    final CountDownLatch controllerStarted = new CountDownLatch(1);
+    final CountDownLatch controllerFinished = new CountDownLatch(1);
+    final AtomicBoolean stopCalled = new AtomicBoolean(false);
+    final Controller controller = new TestController("test-query")
+    {
+      @Override
+      public void run(final QueryListener listener)
+      {
+        try {
+          controllerStarted.countDown();
+          Thread.sleep(300_000);
+        }
+        catch (InterruptedException e) {
+          // expected
+        }
+        finally {
+          listener.onQueryComplete(makeSuccessReport());
+          controllerFinished.countDown();
+        }
+      }
+
+      @Override
+      public void stop(final CancellationReason reason)
+      {
+        stopCalled.set(true);
+      }
+    };
+
+    final ControllerHolder holder = new ControllerHolder(controller, "sql-1", 
null, null, DateTimes.nowUtc());
+    holder.runAsync(new NoopQueryListener(), null, controllerThreadPool);
+
+    controllerStarted.await();
+    holder.cancel(CancellationReason.USER_REQUEST);
+    controllerFinished.await();
+
+    Assert.assertTrue("stop() should have been called as failsafe", 
stopCalled.get());
+  }
+
+  @Test
+  public void testSuccessfulCompletion() throws Exception
+  {
+    final Controller controller = new TestController("test-query")
+    {
+      @Override
+      public void run(final QueryListener listener)
+      {
+        listener.onQueryComplete(makeSuccessReport());
+      }
+    };
+
+    final ControllerHolder holder = new ControllerHolder(controller, "sql-1", 
null, null, DateTimes.nowUtc());
+    final ListenableFuture<?> future = holder.runAsync(new 
NoopQueryListener(), null, controllerThreadPool);
+    future.get(5, TimeUnit.SECONDS);
+
+    Assert.assertEquals(ControllerHolder.State.SUCCESS, holder.getState());
+  }
+
+  @Test
+  public void testCancelBeforeRun() throws Exception
+  {
+    final AtomicBoolean controllerRan = new AtomicBoolean(false);
+    final Controller controller = new TestController("test-query")
+    {
+      @Override
+      public void run(final QueryListener listener)
+      {
+        controllerRan.set(true);
+        listener.onQueryComplete(makeSuccessReport());
+      }
+    };
+
+    final ControllerHolder holder = new ControllerHolder(controller, "sql-1", 
null, null, DateTimes.nowUtc());
+
+    // Cancel before run
+    holder.cancel(CancellationReason.USER_REQUEST);
+    Assert.assertEquals(ControllerHolder.State.CANCELED, holder.getState());
+
+    // Run should complete quickly without running the controller
+    final ListenableFuture<?> future = holder.runAsync(new 
NoopQueryListener(), null, controllerThreadPool);
+    future.get(5, TimeUnit.SECONDS);
+
+    Assert.assertFalse("Controller should not have run", controllerRan.get());
+  }
+
+  @Test
+  public void testDoubleCancelIsIdempotent() throws Exception
+  {
+    final CountDownLatch controllerStarted = new CountDownLatch(1);
+    final CountDownLatch controllerFinished = new CountDownLatch(1);
+    final Controller controller = new TestController("test-query")
+    {
+      @Override
+      public void run(final QueryListener listener)
+      {
+        try {
+          controllerStarted.countDown();
+          Thread.sleep(300_000);
+        }
+        catch (InterruptedException e) {
+          // expected
+        }
+        finally {
+          listener.onQueryComplete(makeSuccessReport());
+          controllerFinished.countDown();
+        }
+      }
+    };
+
+    final ControllerHolder holder = new ControllerHolder(controller, "sql-1", 
null, null, DateTimes.nowUtc());
+    holder.runAsync(new NoopQueryListener(), null, controllerThreadPool);
+
+    controllerStarted.await();
+
+    // Cancel twice — should not throw
+    holder.cancel(CancellationReason.USER_REQUEST);
+    holder.cancel(CancellationReason.USER_REQUEST);
+    controllerFinished.await();
+
+    Assert.assertEquals(ControllerHolder.State.CANCELED, holder.getState());
+  }
+
+  @Test
+  public void testCancelAfterCompletion() throws Exception
+  {
+    final Controller controller = new TestController("test-query")
+    {
+      @Override
+      public void run(final QueryListener listener)
+      {
+        listener.onQueryComplete(makeSuccessReport());
+      }
+    };
+
+    final ControllerHolder holder = new ControllerHolder(controller, "sql-1", 
null, null, DateTimes.nowUtc());
+    final ListenableFuture<?> future = holder.runAsync(new 
NoopQueryListener(), null, controllerThreadPool);
+    future.get(5, TimeUnit.SECONDS);
+
+    Assert.assertEquals(ControllerHolder.State.SUCCESS, holder.getState());
+
+    // Cancel after completion — should be a no-op
+    holder.cancel(CancellationReason.USER_REQUEST);
+    Assert.assertEquals(ControllerHolder.State.SUCCESS, holder.getState());
+  }
+
+  @Test
+  public void testWithNullRegistry() throws Exception
+  {
+    final Controller controller = new TestController("test-query")
+    {
+      @Override
+      public void run(final QueryListener listener)
+      {
+        listener.onQueryComplete(makeSuccessReport());
+      }
+    };
+
+    final ControllerHolder holder = new ControllerHolder(controller, "id", 
null, null, DateTimes.nowUtc());
+    final ControllerThreadPool directPool = new ControllerThreadPool(
+        Execs.directExecutor(),
+        controllerThreadPool.getTimeoutExecutorService()
+    );
+    final ListenableFuture<?> future = holder.runAsync(
+        new NoopQueryListener(),
+        null,
+        directPool
+    );
+    future.get(5, TimeUnit.SECONDS);
+
+    Assert.assertEquals(ControllerHolder.State.SUCCESS, holder.getState());
+  }
+
+  @Test
+  public void testWithRegistry() throws Exception
+  {
+    final AtomicBoolean registered = new AtomicBoolean(false);
+    final AtomicBoolean deregistered = new AtomicBoolean(false);
+    final ControllerRegistry registry = new ControllerRegistry()
+    {
+      @Override
+      public void register(final ControllerHolder holder)
+      {
+        registered.set(true);
+      }
+
+      @Override
+      public void deregister(final ControllerHolder holder, @Nullable final 
TaskReport.ReportMap report)
+      {
+        deregistered.set(true);
+      }
+    };
+
+    final Controller controller = new TestController("test-query")
+    {
+      @Override
+      public void run(final QueryListener listener)
+      {
+        listener.onQueryComplete(makeSuccessReport());
+      }
+    };
+
+    final ControllerHolder holder = new ControllerHolder(controller, "sql-1", 
null, null, DateTimes.nowUtc());
+    final ListenableFuture<?> future = holder.runAsync(new 
NoopQueryListener(), registry, controllerThreadPool);
+    future.get(5, TimeUnit.SECONDS);
+
+    Assert.assertTrue("Should have been registered", registered.get());
+    Assert.assertTrue("Should have been deregistered", deregistered.get());
+  }
+
+  @Test
+  public void testTimeout() throws Exception
+  {
+    final Controller controller = new TestController("test-query")
+    {
+      @Override
+      public QueryContext getQueryContext()
+      {
+        return QueryContext.of(Map.of(QueryContexts.TIMEOUT_KEY, 1));
+      }
+
+      @Override
+      public void run(final QueryListener listener)
+      {
+        try {
+          Thread.sleep(300_000);
+        }
+        catch (InterruptedException ignored) {
+          // expected — canceled due to timeout
+        }
+        finally {
+          listener.onQueryComplete(makeSuccessReport());
+        }
+      }
+    };
+
+    final ControllerHolder holder = new ControllerHolder(controller, "sql-1", 
null, null, DateTimes.nowUtc());
+    final ListenableFuture<?> future = holder.runAsync(new 
NoopQueryListener(), null, controllerThreadPool);
+    future.get(30, TimeUnit.SECONDS);
+
+    Assert.assertEquals(ControllerHolder.State.CANCELED, holder.getState());
+  }
+
+  private static MSQTaskReportPayload makeSuccessReport()
+  {
+    final MSQStatusReport statusReport =
+        new MSQStatusReport(TaskState.SUCCESS, null, null, null, 0, Map.of(), 
0, 0, null, null);
+    return new MSQTaskReportPayload(statusReport, null, null, null);
+  }
+
+  /**
+   * Base class for test controllers that implements all required methods as 
no-ops.
+   */
+  private abstract static class TestController implements Controller
+  {
+    private final String queryId;
+
+    TestController(final String queryId)
+    {
+      this.queryId = queryId;
+    }
+
+    @Override
+    public String queryId()
+    {
+      return queryId;
+    }
+
+    @Override
+    public void stop(final CancellationReason reason)
+    {
+    }
+
+    @Override
+    public void resultsComplete(
+        final String queryId,
+        final int stageNumber,
+        final int workerNumber,
+        final Object resultObject
+    )
+    {
+    }
+
+    @Override
+    public void updatePartialKeyStatisticsInformation(
+        final int stageNumber,
+        final int workerNumber,
+        final Object partialKeyStatisticsInformationObject
+    )
+    {
+    }
+
+    @Override
+    public void doneReadingInput(final int stageNumber, final int workerNumber)
+    {
+    }
+
+    @Override
+    public void workerError(final MSQErrorReport errorReport)
+    {
+    }
+
+    @Override
+    public void workerWarning(final java.util.List<MSQErrorReport> 
errorReports)
+    {
+    }
+
+    @Override
+    public void updateCounters(
+        final String taskId,
+        final org.apache.druid.msq.counters.CounterSnapshotsTree snapshotsTree
+    )
+    {
+    }
+
+    @Override
+    public boolean hasWorker(final String workerId)
+    {
+      return false;
+    }
+
+    @Override
+    public java.util.List<String> getWorkerIds()
+    {
+      return java.util.List.of();
+    }
+
+    @Override
+    public TaskReport.ReportMap liveReports()
+    {
+      return new TaskReport.ReportMap();
+    }
+
+    @Override
+    public ControllerContext getControllerContext()
+    {
+      return null;
+    }
+
+    @Override
+    public QueryContext getQueryContext()
+    {
+      return QueryContext.empty();
+    }
+  }
+}
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
index c93e813ba5e..27381ca6904 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
@@ -40,8 +40,11 @@ import org.apache.druid.indexer.report.TaskReport.ReportMap;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
+import org.apache.druid.msq.exec.ControllerHolder;
 import org.apache.druid.msq.exec.ControllerImpl;
 import org.apache.druid.msq.exec.QueryListener;
 import org.apache.druid.msq.exec.ResultsContext;
@@ -56,6 +59,7 @@ import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.msq.sql.MSQTaskQueryKitSpecFactory;
 import org.apache.druid.msq.util.SqlStatementResourceHelper;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.rpc.indexing.NoopOverlordClient;
 import org.apache.druid.sql.calcite.planner.ColumnMappings;
@@ -189,13 +193,29 @@ public class MSQTestOverlordServiceClient extends 
NoopOverlordClient
               resultsContext
           );
 
+      final ControllerHolder controllerHolder = new ControllerHolder(
+          controller,
+          
controller.getQueryContext().getString(QueryContexts.CTX_SQL_QUERY_ID, 
controller.queryId()),
+          null,
+          null,
+          DateTimes.nowUtc()
+      );
+
+      final ControllerThreadPool controllerThreadPool = new 
ControllerThreadPool(
+          Execs.directExecutor(),
+          Execs.scheduledSingleThreaded("msq-test-controller-timeout-%s")
+      );
+
       try {
-        controller.run(queryListener);
+        controllerHolder.runAsync(queryListener, null, 
controllerThreadPool).get();
         testTaskDetails.taskStatus = 
queryListener.getStatusReport().toTaskStatus(cTask.getId());
       }
       catch (Exception e) {
         testTaskDetails.taskStatus = TaskStatus.failure(cTask.getId(), 
e.toString());
       }
+      finally {
+        controllerThreadPool.stop();
+      }
       return Futures.immediateFuture(null);
     }
     catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to