This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 04137070df7 perf: Interrupt controllers when canceling them. (#19233)
04137070df7 is described below
commit 04137070df7b2a157ff7e49abff7a4c4aa3c58ea
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Apr 2 21:42:59 2026 -0700
perf: Interrupt controllers when canceling them. (#19233)
In PRs #18095 and #18931, worker cancellation was switched to use
interrupts with a lightweight non-interrupt-based failsafe. This patch
implements a similar idea for controllers, to aid in more prompt
cancellation in cases where the controller is blocking on something.
The main change is to track the controller thread in ControllerHolder
and interrupt it on cancel(), in addition to calling controller.stop().
ControllerHolder is also moved from dart.controller to msq.exec, since
it is now a shared class, no longer Dart-specific. In addition, the
"workerOffline" logic is moved to Dart's ControllerMessageListener,
since that really is Dart-specific.
---
.../msq/dart/controller/ControllerHolder.java | 304 -------------
.../dart/controller/ControllerMessageListener.java | 23 +-
.../msq/dart/controller/ControllerThreadPool.java | 25 +-
.../dart/controller/DartControllerRegistry.java | 6 +-
.../msq/dart/controller/http/DartQueryInfo.java | 2 +-
.../msq/dart/controller/sql/DartQueryMaker.java | 28 +-
.../msq/dart/controller/sql/DartSqlEngine.java | 2 +-
.../druid/msq/dart/guice/DartControllerModule.java | 4 +-
.../apache/druid/msq/exec/ControllerHolder.java | 428 +++++++++++++++++++
.../org/apache/druid/msq/exec/ControllerImpl.java | 76 ++--
.../apache/druid/msq/exec/ControllerRegistry.java | 40 ++
.../druid/msq/indexing/MSQControllerTask.java | 43 +-
.../controller/DartControllerRegistryTest.java | 1 +
.../dart/controller/http/DartSqlResourceTest.java | 14 +-
.../dart/controller/sql/DartSqlClientImplTest.java | 2 +-
.../druid/msq/exec/ControllerHolderTest.java | 470 +++++++++++++++++++++
.../msq/test/MSQTestOverlordServiceClient.java | 22 +-
17 files changed, 1102 insertions(+), 388 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
deleted file mode 100644
index b6595a89ba4..00000000000
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.dart.controller;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.report.TaskReport;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.msq.dart.worker.WorkerId;
-import org.apache.druid.msq.exec.CaptureReportQueryListener;
-import org.apache.druid.msq.exec.Controller;
-import org.apache.druid.msq.exec.ControllerContext;
-import org.apache.druid.msq.exec.QueryListener;
-import org.apache.druid.msq.indexing.error.CanceledFault;
-import org.apache.druid.msq.indexing.error.CancellationReason;
-import org.apache.druid.msq.indexing.error.MSQErrorReport;
-import org.apache.druid.msq.indexing.error.WorkerFailedFault;
-import org.apache.druid.msq.indexing.report.MSQStatusReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
-import org.apache.druid.query.BaseQuery;
-import org.apache.druid.server.security.AuthenticationResult;
-import org.apache.druid.sql.http.StandardQueryState;
-import org.joda.time.DateTime;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Holder for {@link Controller}, stored in {@link DartControllerRegistry}.
- */
-public class ControllerHolder
-{
- private static final Logger log = new Logger(ControllerHolder.class);
-
- private final Controller controller;
- private final String sqlQueryId;
- private final String sql;
- private final AuthenticationResult authenticationResult;
- private final DateTime startTime;
- private final AtomicReference<State> state = new
AtomicReference<>(State.ACCEPTED);
-
- public ControllerHolder(
- final Controller controller,
- final String sqlQueryId,
- final String sql,
- final AuthenticationResult authenticationResult,
- final DateTime startTime
- )
- {
- this.controller = Preconditions.checkNotNull(controller, "controller");
- this.sqlQueryId = Preconditions.checkNotNull(sqlQueryId, "sqlQueryId");
- this.sql = sql;
- this.authenticationResult = authenticationResult;
- this.startTime = Preconditions.checkNotNull(startTime, "startTime");
- }
-
- public Controller getController()
- {
- return controller;
- }
-
- public String getSqlQueryId()
- {
- return sqlQueryId;
- }
-
- public String getSql()
- {
- return sql;
- }
-
- public String getControllerHost()
- {
- return getControllerContext().selfNode().getHostAndPortToUse();
- }
-
- private ControllerContext getControllerContext()
- {
- return controller.getControllerContext();
- }
-
- public AuthenticationResult getAuthenticationResult()
- {
- return authenticationResult;
- }
-
- public DateTime getStartTime()
- {
- return startTime;
- }
-
- public State getState()
- {
- return state.get();
- }
-
- /**
- * Call when a worker has gone offline. Closes its client and sends a {@link
Controller#workerError}
- * to the controller.
- */
- public void workerOffline(final WorkerId workerId)
- {
- final String workerIdString = workerId.toString();
-
- ControllerContext controllerContext = getControllerContext();
- if (controllerContext instanceof DartControllerContext) {
- DartControllerContext dartControllerContext = (DartControllerContext)
controllerContext;
- // For DartControllerContext, newWorkerClient() returns the same
instance every time.
- // This will always be DartControllerContext in production; the
instanceof check is here because certain
- // tests use a different context class.
-
dartControllerContext.newWorkerClient().closeClient(workerId.getHostAndPort());
- }
-
- if (controller.hasWorker(workerIdString)) {
- controller.workerError(
- MSQErrorReport.fromFault(
- workerIdString,
- workerId.getHostAndPort(),
- null,
- new WorkerFailedFault(workerIdString, "Worker went offline")
- )
- );
- }
- }
-
- /**
- * Places this holder into {@link State#CANCELED}. Calls {@link
Controller#stop(CancellationReason)} if it was
- * previously in state {@link State#RUNNING}.
- */
- public void cancel(CancellationReason reason)
- {
- if (state.compareAndSet(State.ACCEPTED, State.CANCELED)) {
- // No need to call stop() since run() wasn't called.
- return;
- }
-
- if (state.compareAndSet(State.RUNNING, State.CANCELED)) {
- controller.stop(reason);
- }
- }
-
- /**
- * Runs {@link Controller#run(QueryListener)} in the provided executor.
Registers the controller with the provided
- * registry while it is running.
- *
- * @return future that resolves when the controller is done or canceled.
- */
- public ListenableFuture<?> runAsync(
- final QueryListener listener,
- final DartControllerRegistry controllerRegistry,
- final ControllerThreadPool threadPool
- )
- {
- // Register controller before submitting anything to controllerExeuctor,
so it shows up in
- // "active controllers" lists.
- controllerRegistry.register(this);
-
- final ListenableFuture<?> future =
threadPool.getExecutorService().submit(() -> {
- final String threadName = Thread.currentThread().getName();
- Thread.currentThread().setName(makeThreadName());
-
- try {
- final CaptureReportQueryListener reportListener = new
CaptureReportQueryListener(listener);
-
- try {
- if (state.compareAndSet(State.ACCEPTED, State.RUNNING)) {
- controller.run(reportListener);
- updateStateOnQueryComplete(reportListener.getReport());
- } else {
- // Canceled before running.
- reportListener.onQueryComplete(makeCanceledReport());
- }
- }
- catch (Throwable e) {
- log.warn(
- e,
- "Controller[%s] failed, queryId[%s], sqlQueryId[%s]",
- controller.queryId(),
- controller.getQueryContext().getString(BaseQuery.QUERY_ID),
- sqlQueryId
- );
- }
- finally {
- // Build report and then call "deregister".
- final MSQTaskReport taskReport;
-
- if (reportListener.hasReport()) {
- taskReport = new MSQTaskReport(controller.queryId(),
reportListener.getReport());
- } else {
- taskReport = null;
- }
-
- final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
- reportMap.put(MSQTaskReport.REPORT_KEY, taskReport);
- controllerRegistry.deregister(this, reportMap);
- }
- }
- finally {
- Thread.currentThread().setName(threadName);
- }
- });
-
- // Must not cancel the above future, otherwise "deregister" may never get
called. If a controller is canceled
- // before it runs, the runnable above stays in the queue until it gets a
thread, then it exits without running
- // the controller.
- return Futures.nonCancellationPropagating(future);
- }
-
- private void updateStateOnQueryComplete(final MSQTaskReportPayload report)
- {
- switch (report.getStatus().getStatus()) {
- case SUCCESS:
- state.compareAndSet(State.RUNNING, State.SUCCESS);
- break;
-
- case FAILED:
- state.compareAndSet(State.RUNNING, State.FAILED);
- break;
- }
- }
-
- /**
- * Generate a name for the thread that {@link #runAsync} uses.
- */
- private String makeThreadName()
- {
- return StringUtils.format(
- "%s[%s]-sqlQueryId[%s]",
- Thread.currentThread().getName(),
- controller.queryId(),
- sqlQueryId
- );
- }
-
- private MSQTaskReportPayload makeCanceledReport()
- {
- final MSQErrorReport errorReport =
- MSQErrorReport.fromFault(controller.queryId(), null, null,
CanceledFault.userRequest());
- final MSQStatusReport statusReport =
- new MSQStatusReport(TaskState.FAILED, errorReport, null, null, 0,
Map.of(), 0, 0, null, null);
- return new MSQTaskReportPayload(statusReport, null, null, null);
- }
-
- public enum State
- {
- /**
- * Query has been accepted, but not yet {@link
Controller#run(QueryListener)}.
- */
- ACCEPTED(StandardQueryState.ACCEPTED),
-
- /**
- * Query has had {@link Controller#run(QueryListener)} called.
- */
- RUNNING(StandardQueryState.RUNNING),
-
- /**
- * Query has been canceled.
- */
- CANCELED(StandardQueryState.CANCELED),
-
- /**
- * Query has exited successfully.
- */
- SUCCESS(StandardQueryState.SUCCESS),
-
- /**
- * Query has failed.
- */
- FAILED(StandardQueryState.FAILED);
-
- private final String statusString;
-
- State(String statusString)
- {
- this.statusString = statusString;
- }
-
- public String getStatusString()
- {
- return statusString;
- }
- }
-}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
index e9d6fbb3010..0fe85f42c75 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
@@ -24,7 +24,10 @@ import org.apache.druid.messages.client.MessageListener;
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
import org.apache.druid.msq.dart.worker.WorkerId;
import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.ControllerHolder;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
+import org.apache.druid.msq.indexing.error.WorkerFailedFault;
import org.apache.druid.server.DruidNode;
/**
@@ -62,7 +65,25 @@ public class ControllerMessageListener implements
MessageListener<ControllerMess
for (final ControllerHolder holder :
controllerRegistry.getAllControllers()) {
final Controller controller = holder.getController();
final WorkerId workerId = WorkerId.fromDruidNode(node,
controller.queryId());
- holder.workerOffline(workerId);
+ final String workerIdString = workerId.toString();
+
+ // Close the worker client for this server.
+ final ControllerContext controllerContext =
controller.getControllerContext();
+ if (controllerContext instanceof DartControllerContext) {
+ ((DartControllerContext)
controllerContext).newWorkerClient().closeClient(workerId.getHostAndPort());
+ }
+
+ // Notify the controller that the worker has gone offline.
+ if (controller.hasWorker(workerIdString)) {
+ controller.workerError(
+ MSQErrorReport.fromFault(
+ workerIdString,
+ workerId.getHostAndPort(),
+ null,
+ new WorkerFailedFault(workerIdString, "Worker went offline")
+ )
+ );
+ }
}
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
index 8e369d6fa81..352b0a86678 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
@@ -25,6 +25,8 @@ import
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.msq.dart.guice.DartControllerConfig;
import org.apache.druid.msq.exec.Controller;
+import java.util.concurrent.ScheduledExecutorService;
+
/**
* Thread pool for running {@link Controller}. Number of threads is equal to
* {@link DartControllerConfig#getConcurrentQueries()}, which limits the
number of concurrent controllers.
@@ -32,21 +34,32 @@ import org.apache.druid.msq.exec.Controller;
@ManageLifecycle
public class ControllerThreadPool
{
- private final ListeningExecutorService executorService;
+ private final ListeningExecutorService runExec;
+ private final ScheduledExecutorService timeoutExec;
+
+ public ControllerThreadPool(
+ final ListeningExecutorService runExec,
+ final ScheduledExecutorService timeoutExec
+ )
+ {
+ this.runExec = runExec;
+ this.timeoutExec = timeoutExec;
+ }
- public ControllerThreadPool(final ListeningExecutorService executorService)
+ public ListeningExecutorService getRunExecutorService()
{
- this.executorService = executorService;
+ return runExec;
}
- public ListeningExecutorService getExecutorService()
+ public ScheduledExecutorService getTimeoutExecutorService()
{
- return executorService;
+ return timeoutExec;
}
@LifecycleStop
public void stop()
{
- executorService.shutdown();
+ runExec.shutdown();
+ timeoutExec.shutdown();
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
index f1d1e4e1b76..09176bcfe77 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
@@ -32,6 +32,8 @@ import
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.guice.DartControllerConfig;
import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.exec.ControllerHolder;
+import org.apache.druid.msq.exec.ControllerRegistry;
import org.apache.druid.msq.indexing.report.MSQStatusReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
@@ -54,7 +56,7 @@ import java.util.concurrent.TimeUnit;
* Registry for actively-running {@link Controller} and recently-completed
{@link TaskReport}.
*/
@ManageLifecycle
-public class DartControllerRegistry
+public class DartControllerRegistry implements ControllerRegistry
{
/**
* Minimum frequency for checking if {@link #cleanupExpiredReports()} needs
to be run.
@@ -127,6 +129,7 @@ public class DartControllerRegistry
* Add a controller. Throws {@link DruidException} if a controller with the
same {@link Controller#queryId()} is
* already registered.
*/
+ @Override
public void register(ControllerHolder holder)
{
final String dartQueryId = holder.getController().queryId();
@@ -141,6 +144,7 @@ public class DartControllerRegistry
* time afterwards, based on {@link
DartControllerConfig#getMaxRetainedReportCount()} and
* {@link DartControllerConfig#getMaxRetainedReportDuration()}.
*/
+ @Override
public void deregister(ControllerHolder holder, @Nullable
TaskReport.ReportMap completeReport)
{
final String dartQueryId = holder.getController().queryId();
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
index 9902b86239f..ef36d5146ba 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
@@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
+import org.apache.druid.msq.exec.ControllerHolder;
import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.server.DruidNode;
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index 053e5fef592..d6f62f859f7 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -19,7 +19,6 @@
package org.apache.druid.msq.dart.controller.sql;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.frame.Frame;
import org.apache.druid.indexer.report.TaskReport;
@@ -30,13 +29,13 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.dart.controller.ControllerThreadPool;
import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
-import org.apache.druid.msq.dart.controller.DartControllerRegistry;
import org.apache.druid.msq.dart.guice.DartControllerConfig;
import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.ControllerHolder;
import org.apache.druid.msq.exec.ControllerImpl;
+import org.apache.druid.msq.exec.ControllerRegistry;
import org.apache.druid.msq.exec.QueryKitSpecFactory;
import org.apache.druid.msq.exec.ResultsContext;
import org.apache.druid.msq.exec.SequenceQueryListener;
@@ -83,7 +82,7 @@ public class DartQueryMaker implements QueryMaker
/**
* Controller registry, used to register and remove controllers as they
start and finish.
*/
- private final DartControllerRegistry controllerRegistry;
+ private final ControllerRegistry controllerRegistry;
/**
* Controller config.
@@ -102,7 +101,7 @@ public class DartQueryMaker implements QueryMaker
List<Entry<Integer, String>> fieldMapping,
DartControllerContextFactory controllerContextFactory,
PlannerContext plannerContext,
- DartControllerRegistry controllerRegistry,
+ ControllerRegistry controllerRegistry,
DartControllerConfig controllerConfig,
ControllerThreadPool controllerThreadPool,
QueryKitSpecFactory queryKitSpecFactory,
@@ -238,7 +237,7 @@ public class DartQueryMaker implements QueryMaker
* Run a query and return the full report, buffered in memory up to
* {@link DartControllerConfig#getMaxQueryReportSize()}.
*
- * Arranges for {@link DartControllerRegistry#deregister} to be called upon
completion (either success or failure).
+ * Arranges for {@link ControllerRegistry#deregister} to be called upon
completion (either success or failure).
*/
private Sequence<Object[]> runWithReport(
final ControllerHolder controllerHolder,
@@ -268,7 +267,11 @@ public class DartQueryMaker implements QueryMaker
try {
// Submit controller and wait for it to finish.
- controllerHolder.runAsync(listener, controllerRegistry,
controllerThreadPool).get();
+ controllerHolder.runAsync(
+ listener,
+ controllerRegistry,
+ controllerThreadPool
+ ).get();
// Return a sequence with just one row (the report).
final TaskReport.ReportMap reportMap =
@@ -288,7 +291,7 @@ public class DartQueryMaker implements QueryMaker
/**
* Run a query and return the results only, streamed back using {@link
SequenceQueryListener}.
*
- * Arranges for {@link DartControllerRegistry#deregister} to be called upon
completion (either success or failure).
+ * Arranges for {@link ControllerRegistry#deregister} to be called upon
completion (either success or failure).
*/
private Sequence<Object[]> runWithSequence(
final ControllerHolder controllerHolder,
@@ -297,8 +300,11 @@ public class DartQueryMaker implements QueryMaker
)
{
final SequenceQueryListener listener = new SequenceQueryListener();
- final ListenableFuture<?> runFuture =
- controllerHolder.runAsync(listener, controllerRegistry,
controllerThreadPool);
+ controllerHolder.runAsync(
+ listener,
+ controllerRegistry,
+ controllerThreadPool
+ );
return Sequences.wrap(
listener.getSequence().flatMap(
@@ -316,7 +322,7 @@ public class DartQueryMaker implements QueryMaker
public void after(final boolean isDone, final Throwable thrown)
{
if (!isDone || thrown != null) {
- runFuture.cancel(true); // Cancel on early stop or failure
+ controllerHolder.cancel(CancellationReason.UNKNOWN);
}
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
index 667a34a29f4..a66a72ec715 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
@@ -34,7 +34,6 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.dart.Dart;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.dart.controller.ControllerThreadPool;
import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
@@ -42,6 +41,7 @@ import
org.apache.druid.msq.dart.controller.QueryInfoAndReport;
import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.guice.DartControllerConfig;
import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.exec.ControllerHolder;
import org.apache.druid.msq.exec.QueryKitSpecFactory;
import org.apache.druid.msq.indexing.error.CancellationReason;
import org.apache.druid.msq.querykit.MultiQueryKit;
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
index 3b86b5e8b70..8a9b1cf3607 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
@@ -37,6 +37,7 @@ import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.msq.dart.Dart;
import org.apache.druid.msq.dart.DartResourcePermissionMapper;
import org.apache.druid.msq.dart.controller.ControllerMessageListener;
@@ -141,7 +142,8 @@ public class DartControllerModule implements DruidModule
dartControllerConfig.getConcurrentQueries(),
"dart-controller-%s"
)
- )
+ ),
+ ScheduledExecutors.fixed(1, "dart-controller-timeout-%s")
);
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerHolder.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerHolder.java
new file mode 100644
index 00000000000..4caf79d2866
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerHolder.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
+import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.CancellationReason;
+import org.apache.druid.msq.indexing.error.MSQErrorReport;
+import org.apache.druid.msq.indexing.report.MSQStatusReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.sql.http.StandardQueryState;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Holder for a {@link Controller} that manages its lifecycle, including
cancellation and timeouts.
+ */
+public class ControllerHolder
+{
+ private static final Logger log = new Logger(ControllerHolder.class);
+
+ private final Controller controller;
+ private final String sqlQueryId;
+
+ @Nullable
+ private final String sql;
+
+ @Nullable
+ private final AuthenticationResult authenticationResult;
+
+ private final DateTime startTime;
+
+ @GuardedBy("this")
+ private State state = State.ACCEPTED;
+
+ /**
+ * Thread running the controller. Set inside the {@link #runAsync} runnable,
cleared in its finally block.
+ */
+ @GuardedBy("this")
+ private Thread controllerThread;
+
+ /**
+ * If cancel was called, the reason for cancellation. Used to deliver the
cancellation to the controller thread
+ * if {@link #cancel} is called before the controller starts running.
+ */
+ @GuardedBy("this")
+ private CancellationReason cancelReason;
+
+ public ControllerHolder(
+ final Controller controller,
+ final String sqlQueryId,
+ @Nullable final String sql,
+ @Nullable final AuthenticationResult authenticationResult,
+ final DateTime startTime
+ )
+ {
+ this.controller = Preconditions.checkNotNull(controller, "controller");
+ this.sqlQueryId = Preconditions.checkNotNull(sqlQueryId, "sqlQueryId");
+ this.sql = sql;
+ this.authenticationResult = authenticationResult;
+ this.startTime = Preconditions.checkNotNull(startTime, "startTime");
+ }
+
+ public Controller getController()
+ {
+ return controller;
+ }
+
+ @Nullable
+ public String getSqlQueryId()
+ {
+ return sqlQueryId;
+ }
+
+ @Nullable
+ public String getSql()
+ {
+ return sql;
+ }
+
+ public String getControllerHost()
+ {
+ return getControllerContext().selfNode().getHostAndPortToUse();
+ }
+
+ private ControllerContext getControllerContext()
+ {
+ return controller.getControllerContext();
+ }
+
+ @Nullable
+ public AuthenticationResult getAuthenticationResult()
+ {
+ return authenticationResult;
+ }
+
+ public DateTime getStartTime()
+ {
+ return startTime;
+ }
+
+ public synchronized State getState()
+ {
+ return state;
+ }
+
+ /**
+ * Runs {@link Controller#run(QueryListener)} in {@link
ControllerThreadPool#getRunExecutorService()}. Optionally
+ * registers the controller with the provided registry while it is running.
Schedules a timeout on the provided
+ * {@link ControllerThreadPool#getTimeoutExecutorService()} .
+ *
+ * @return future that resolves when the controller is done or canceled
+ */
+ public ListenableFuture<?> runAsync(
+ final QueryListener listener,
+ @Nullable final ControllerRegistry controllerRegistry,
+ final ControllerThreadPool controllerThreadPool
+ )
+ {
+ if (controllerRegistry != null) {
+ // Register controller before submitting anything to the executor, so it
shows up in
+ // "active controllers" lists.
+ controllerRegistry.register(this);
+ }
+
+ // Schedule timeout based on the query deadline. The scheduled task calls
cancel(), which is
+ // safe even if the controller has already finished (cancel is a no-op for
terminal states).
+ final ScheduledFuture<?> timeoutFuture =
scheduleTimeout(controllerThreadPool.getTimeoutExecutorService());
+
+ final ListenableFuture<?> runFuture =
controllerThreadPool.getRunExecutorService().submit(() -> {
+ final String threadName = Thread.currentThread().getName();
+ Thread.currentThread().setName(makeThreadName());
+
+ try {
+ final CaptureReportQueryListener reportListener = new
CaptureReportQueryListener(listener);
+
+ try {
+ if (transitionToRunning()) {
+ try {
+ controller.run(reportListener);
+ }
+ finally {
+ synchronized (this) {
+ controllerThread = null;
+ // Clear any interrupt delivered during or after
controller.run().
+ //noinspection ResultOfMethodCallIgnored
+ Thread.interrupted();
+ }
+ }
+
+ updateStateOnQueryComplete(reportListener.getReport());
+ } else {
+ // Canceled before running.
+ synchronized (this) {
+ reportListener.onQueryComplete(makeCanceledReport(cancelReason));
+ }
+ }
+ }
+ catch (Throwable e) {
+ log.warn(
+ e,
+ "Controller[%s] failed, queryId[%s], sqlQueryId[%s]",
+ controller.queryId(),
+ controller.getQueryContext().getString(BaseQuery.QUERY_ID),
+ sqlQueryId
+ );
+ }
+ finally {
+ // Build report and then call "deregister".
+ final MSQTaskReport taskReport;
+
+ if (reportListener.hasReport()) {
+ taskReport = new MSQTaskReport(controller.queryId(),
reportListener.getReport());
+ } else {
+ taskReport = null;
+ }
+
+ final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+ reportMap.put(MSQTaskReport.REPORT_KEY, taskReport);
+
+ if (controllerRegistry != null) {
+ controllerRegistry.deregister(this, reportMap);
+ }
+ }
+ }
+ finally {
+ Thread.currentThread().setName(threadName);
+
+ if (timeoutFuture != null) {
+ timeoutFuture.cancel(false);
+ }
+ }
+ });
+
+ // Must not cancel the above future, otherwise "deregister" may never get
called. If a controller is canceled
+ // before it runs, the runnable above stays in the queue until it gets a
thread, then it exits without running
+ // the controller.
+ return Futures.nonCancellationPropagating(runFuture);
+ }
+
+ /**
+ * Places this holder into {@link State#CANCELED} and stops the controller.
+ */
+ public void cancel(final CancellationReason reason)
+ {
+ final State prevState;
+ synchronized (this) {
+ prevState = state;
+
+ if (state == State.ACCEPTED || state == State.RUNNING) {
+ state = State.CANCELED;
+ cancelReason = reason;
+ }
+ }
+
+ if (prevState == State.RUNNING) {
+ controller.stop(reason);
+
+ // Interrupt the controller thread as a failsafe, in case the controller
is blocked on something.
+ synchronized (this) {
+ if (controllerThread != null) {
+ controllerThread.interrupt();
+ }
+ }
+ }
+ }
+
+ /**
+ * Attempts to transition from {@link State#ACCEPTED} to {@link
State#RUNNING} and capture the controller thread.
+ * If a cancellation arrived between the state transition and capturing the
thread, generates a canceled report
+ * on the provided listener and returns false.
+ *
+ * @return true if the controller should proceed to run, false if it was
canceled
+ */
+ private synchronized boolean transitionToRunning()
+ {
+ if (state != State.ACCEPTED) {
+ return false;
+ }
+
+ state = State.RUNNING;
+ controllerThread = Thread.currentThread();
+ return true;
+ }
+
+ /**
+ * Schedules a timeout task that cancels the controller when the query
deadline elapses. If the deadline has
+ * already passed, cancels immediately without scheduling. Returns null if
no timeout is configured or if
+ * an immediate cancellation was performed.
+ */
+ @Nullable
+ private ScheduledFuture<?> scheduleTimeout(final ScheduledExecutorService
scheduledExec)
+ {
+ final DateTime deadline = getQueryDeadline();
+
+ if (deadline == null) {
+ return null;
+ }
+
+ final long delayMs = deadline.getMillis() - DateTimes.nowUtc().getMillis();
+
+ if (delayMs <= 0) {
+ // Deadline has already passed. Cancel immediately rather than
scheduling, so the cancellation
+ // takes effect even when using a direct executor for the controller
thread.
+ cancel(CancellationReason.QUERY_TIMEOUT);
+ return null;
+ }
+
+ return scheduledExec.schedule(
+ () -> cancel(CancellationReason.QUERY_TIMEOUT),
+ delayMs,
+ TimeUnit.MILLISECONDS
+ );
+ }
+
+ /**
+ * Retrieves the query deadline from the controller's query context. Returns
null if no timeout is configured.
+ */
+ @Nullable
+ private DateTime getQueryDeadline()
+ {
+ final QueryContext queryContext = controller.getQueryContext();
+ DateTime deadline = MultiStageQueryContext.getQueryDeadline(queryContext);
+
+ if (deadline == null) {
+ // Newer Brokers set the deadline, but older ones might not. Fall back
to startTime and timeout in this case.
+ final long timeout = queryContext.getTimeout(QueryContexts.NO_TIMEOUT);
+ if (timeout != QueryContexts.NO_TIMEOUT) {
+ deadline =
MultiStageQueryContext.getStartTime(queryContext).plus(timeout);
+ }
+ }
+
+ return deadline;
+ }
+
+ /**
+ * If {@link #state} is {@link State#RUNNING}, update it based on the
outcome of a query.
+ * Otherwise do nothing.
+ */
+ private synchronized void updateStateOnQueryComplete(final
MSQTaskReportPayload report)
+ {
+ if (state != State.RUNNING) {
+ return;
+ }
+
+ switch (report.getStatus().getStatus()) {
+ case SUCCESS:
+ state = State.SUCCESS;
+ break;
+
+ case FAILED:
+ state = State.FAILED;
+ break;
+ }
+ }
+
+ /**
+ * Generate a name for the thread that {@link #runAsync} uses.
+ */
+ private String makeThreadName()
+ {
+ if (sqlQueryId != null) {
+ return StringUtils.format(
+ "%s[%s]-sqlQueryId[%s]",
+ Thread.currentThread().getName(),
+ controller.queryId(),
+ sqlQueryId
+ );
+ } else {
+ return StringUtils.format(
+ "%s[%s]",
+ Thread.currentThread().getName(),
+ controller.queryId()
+ );
+ }
+ }
+
+ private MSQTaskReportPayload makeCanceledReport(@Nullable final
CancellationReason reason)
+ {
+ final MSQErrorReport errorReport =
+ MSQErrorReport.fromFault(
+ controller.queryId(),
+ null,
+ null,
+ new CanceledFault(reason != null ? reason :
CancellationReason.UNKNOWN)
+ );
+ final MSQStatusReport statusReport =
+ new MSQStatusReport(TaskState.FAILED, errorReport, null, null, 0,
Map.of(), 0, 0, null, null);
+ return new MSQTaskReportPayload(statusReport, null, null, null);
+ }
+
+ public enum State
+ {
+ /**
+ * Query has been accepted, but not yet {@link
Controller#run(QueryListener)}.
+ */
+ ACCEPTED(StandardQueryState.ACCEPTED),
+
+ /**
+ * Query has had {@link Controller#run(QueryListener)} called.
+ */
+ RUNNING(StandardQueryState.RUNNING),
+
+ /**
+ * Query has been canceled.
+ */
+ CANCELED(StandardQueryState.CANCELED),
+
+ /**
+ * Query has exited successfully.
+ */
+ SUCCESS(StandardQueryState.SUCCESS),
+
+ /**
+ * Query has failed.
+ */
+ FAILED(StandardQueryState.FAILED);
+
+ private final String statusString;
+
+ State(final String statusString)
+ {
+ this.statusString = statusString;
+ }
+
+ public String getStatusString()
+ {
+ return statusString;
+ }
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 082ab512d42..e02d2909547 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -172,7 +172,6 @@ import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
-import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
@@ -219,7 +218,6 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -252,6 +250,13 @@ public class ControllerImpl implements Controller
// For system error reporting. This is the very first error we got from a
worker. (We only report that one.)
private final AtomicReference<MSQErrorReport> workerErrorRef = new
AtomicReference<>();
+ /**
+ * Set by {@link #stop(CancellationReason)}. If non-null, this reason takes
priority over any exception
+ * encountered during execution when building the error report. If we didn't
do this, interrupts arising
+ * from cancellation could produce errors that are less informative than the
actual cancellation reason.
+ */
+ private volatile CancellationReason cancelReason;
+
// For system warning reporting
private final ConcurrentLinkedQueue<MSQErrorReport> workerWarnings = new
ConcurrentLinkedQueue<>();
@@ -376,7 +381,9 @@ public class ControllerImpl implements Controller
// stopGracefully() is called when the containing process is terminated,
or when the task is canceled.
log.info("Query [%s] canceled.", queryDef != null ? queryDef.getQueryId()
: "<no id yet>");
+ cancelReason = reason;
stopExternalFetchers();
+ kernelManipulationQueue.clear(); // No point processing any
possibly-queued commands.
addToKernelManipulationQueue(
kernel -> {
throw new MSQException(new CanceledFault(reason));
@@ -471,7 +478,12 @@ public class ControllerImpl implements Controller
MSQErrorReport workerError = workerErrorRef.get();
taskStateForReport = TaskState.FAILED;
- errorForReport = MSQTasks.makeErrorReport(queryId(), selfHost,
controllerError, workerError);
+
+ if (cancelReason != null) {
+ errorForReport = MSQErrorReport.fromFault(queryId(), selfHost, null,
new CanceledFault(cancelReason));
+ } else {
+ errorForReport = MSQTasks.makeErrorReport(queryId(), selfHost,
controllerError, workerError);
+ }
// Log the errors we encountered.
if (controllerError != null) {
@@ -667,6 +679,9 @@ public class ControllerImpl implements Controller
* controller loop in {@link RunQueryUntilDone#run()}.
* <p>
* If the consumer throws an exception, the query fails.
+ * <p>
+ * Consumers must not perform blocking operations (network calls, waiting on
futures, sleeping, etc.), because
+ * the main controller loop executes them in sequence and blocking would
delay controller operations.
*/
public void addToKernelManipulationQueue(Consumer<ControllerQueryKernel>
kernelConsumer)
{
@@ -2328,10 +2343,6 @@ public class ControllerImpl implements Controller
startTaskLauncher();
boolean runAgain;
- final DateTime queryFailDeadline = getQueryDeadline();
-
- // The timeout could have already elapsed while waiting for the
controller to start, check it now.
- checkTimeout(queryFailDeadline);
while (!queryKernel.isDone()) {
startStages();
@@ -2344,10 +2355,8 @@ public class ControllerImpl implements Controller
checkForErrorsInSketchFetcher();
if (!runAgain) {
- runKernelCommands(queryFailDeadline);
+ runKernelCommands();
}
-
- checkTimeout(queryFailDeadline);
}
if (!queryKernel.isSuccess()) {
@@ -2359,34 +2368,6 @@ public class ControllerImpl implements Controller
return Pair.of(queryKernel, workerTaskLauncherFuture);
}
- /**
- * Retrieves the timeout and start time from the query context and reads
or calculates the deadline.
- */
- private DateTime getQueryDeadline()
- {
- DateTime deadline =
MultiStageQueryContext.getQueryDeadline(querySpec.getContext());
-
- if (deadline == null) {
- // Newer Brokers set the deadline, but older ones might not. Fall back
to startTime and timeout in this case.
- final long timeout =
querySpec.getContext().getTimeout(QueryContexts.NO_TIMEOUT);
- if (timeout != QueryContexts.NO_TIMEOUT) {
- deadline =
MultiStageQueryContext.getStartTime(querySpec.getContext()).plus(timeout);
- }
- }
-
- return deadline != null ? deadline : DateTimes.MAX;
- }
-
- /**
- * Checks the queryFailDeadline and fails the query with a {@link
CanceledFault} if it has passed.
- */
- private void checkTimeout(DateTime queryFailDeadline)
- {
- if (queryFailDeadline.isBeforeNow()) {
- throw new MSQException(CanceledFault.timeout());
- }
- }
-
private void checkForErrorsInSketchFetcher()
{
Throwable throwable = workerSketchFetcher.getError();
@@ -2472,24 +2453,21 @@ public class ControllerImpl implements Controller
/**
* Run at least one command from {@link #kernelManipulationQueue}, waiting
for it if necessary.
+ * Timeouts are handled externally by {@link ControllerHolder}, which
calls {@link Controller#stop}
+ * to enqueue a {@link CanceledFault} and then interrupts this thread when
the query deadline elapses.
*/
- private void runKernelCommands(DateTime queryFailDeadline) throws
InterruptedException
+ private void runKernelCommands() throws InterruptedException
{
if (!queryKernel.isDone()) {
- // Run the next command, waiting till timeout for it if necessary.
- Consumer<ControllerQueryKernel> command = kernelManipulationQueue.poll(
- queryFailDeadline.getMillis() - DateTimes.nowUtc().getMillis(),
- TimeUnit.MILLISECONDS
- );
- if (command == null) {
- return;
- }
+ // Run the next command, waiting for it if necessary.
+ final Consumer<ControllerQueryKernel> command =
kernelManipulationQueue.take();
command.accept(queryKernel);
// Run all pending commands after that one. Helps avoid deep queues.
// After draining the command queue, move on to the next iteration of
the controller loop.
- while ((command = kernelManipulationQueue.poll()) != null) {
- command.accept(queryKernel);
+ Consumer<ControllerQueryKernel> next;
+ while ((next = kernelManipulationQueue.poll()) != null) {
+ next.accept(queryKernel);
}
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerRegistry.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerRegistry.java
new file mode 100644
index 00000000000..30a1a3379fc
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerRegistry.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.indexer.report.TaskReport;
+
+import javax.annotation.Nullable;
+
+/**
+ * Registry for actively-running {@link Controller} instances, held in {@link
ControllerHolder}.
+ */
+public interface ControllerRegistry
+{
+ /**
+ * Register a controller holder. Throws if a controller with the same query
ID is already registered.
+ */
+ void register(ControllerHolder holder);
+
+ /**
+ * Deregister a controller holder. Optionally stores a final report.
+ */
+ void deregister(ControllerHolder holder, @Nullable TaskReport.ReportMap
report);
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index c43d855ad50..1950aeace5f 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -40,9 +40,14 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.ControllerHolder;
import org.apache.druid.msq.exec.ControllerImpl;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.exec.ResultsContext;
@@ -72,6 +77,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
+
@JsonTypeName(MSQControllerTask.TYPE)
public class MSQControllerTask extends AbstractTask implements
ClientTaskQuery, PendingSegmentAllocatingTask
{
@@ -113,7 +119,7 @@ public class MSQControllerTask extends AbstractTask
implements ClientTaskQuery,
@JacksonInject
private Injector injector;
- private volatile Controller controller;
+ private volatile ControllerHolder controllerHolder;
@JsonCreator
public MSQControllerTask(
@@ -262,13 +268,22 @@ public class MSQControllerTask extends AbstractTask
implements ClientTaskQuery,
{
final ControllerContext context =
injector.getInstance(IndexerControllerContextFactory.class)
.buildWithTask(this, toolbox);
- controller = new ControllerImpl(
+
+ final ControllerImpl controller = new ControllerImpl(
querySpec,
new ResultsContext(getSqlTypeNames(), getSqlResultsContext()),
context,
injector.getInstance(MSQTaskQueryKitSpecFactory.class)
);
+ controllerHolder = new ControllerHolder(
+ controller,
+ controller.getQueryContext().getString(QueryContexts.CTX_SQL_QUERY_ID,
controller.queryId()),
+ getSqlQuery(),
+ null,
+ DateTimes.nowUtc()
+ );
+
final ResultsContext resultsContext = new
ResultsContext(getSqlTypeNames(), getSqlResultsContext());
final TaskReportQueryListener queryListener = new TaskReportQueryListener(
() ->
toolbox.getTaskReportFileWriter().openReportOutputStream(getId()),
@@ -280,15 +295,29 @@ public class MSQControllerTask extends AbstractTask
implements ClientTaskQuery,
resultsContext
);
- controller.run(queryListener);
- return queryListener.getStatusReport().toTaskStatus(getId());
+ final ControllerThreadPool controllerThreadPool = new ControllerThreadPool(
+ Execs.directExecutor(),
+ ScheduledExecutors.fixed(
+ 1,
+ "controller-timeout[" +
StringUtils.encodeForFormat(controller.queryId()) + "]-%s"
+ )
+ );
+
+ try {
+ controllerHolder.runAsync(queryListener, null,
controllerThreadPool).get();
+ return queryListener.getStatusReport().toTaskStatus(getId());
+ }
+ finally {
+ controllerThreadPool.stop();
+ }
}
@Override
public void stopGracefully(final TaskConfig taskConfig)
{
- if (controller != null) {
- controller.stop(CancellationReason.TASK_SHUTDOWN);
+ final ControllerHolder holder = controllerHolder;
+ if (holder != null) {
+ holder.cancel(CancellationReason.TASK_SHUTDOWN);
}
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
index 94d5be39150..74eea1f0d2b 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.msq.dart.guice.DartControllerConfig;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.ControllerHolder;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.server.DruidNode;
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index 97014d91714..d4116fed863 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -36,7 +36,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.dart.controller.ControllerThreadPool;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
import org.apache.druid.msq.dart.controller.sql.DartQueryMaker;
@@ -46,6 +45,7 @@ import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
import org.apache.druid.msq.dart.guice.DartControllerConfig;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.ControllerHolder;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.InvalidNullByteFault;
@@ -114,6 +114,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -263,6 +264,11 @@ public class DartSqlResourceTest extends MSQTestBase
MAX_CONTROLLERS,
StringUtils.encodeForFormat(getClass().getSimpleName() +
"-controller-exec")
)
+ ),
+ Executors.newSingleThreadScheduledExecutor(
+ Execs.makeThreadFactory(
+ StringUtils.encodeForFormat(getClass().getSimpleName() +
"-controller-timeout")
+ )
)
),
new DartQueryKitSpecFactory(new
TestTimelineServerView(Collections.emptyList())),
@@ -297,9 +303,9 @@ public class DartSqlResourceTest extends MSQTestBase
mockCloser.close();
// shutdown(), not shutdownNow(), to ensure controllers stop timely on
their own.
- controllerThreadPool.getExecutorService().shutdown();
+ controllerThreadPool.getRunExecutorService().shutdown();
- if (!controllerThreadPool.getExecutorService().awaitTermination(1,
TimeUnit.MINUTES)) {
+ if (!controllerThreadPool.getRunExecutorService().awaitTermination(1,
TimeUnit.MINUTES)) {
throw new IAE("controllerExecutor.awaitTermination() timed out");
}
@@ -753,7 +759,7 @@ public class DartSqlResourceTest extends MSQTestBase
.thenReturn(makeAuthenticationResult(REGULAR_USER_NAME));
// Block up the controllerExecutor so the controller runs long enough to
cancel it.
- final Future<?> sleepFuture =
controllerThreadPool.getExecutorService().submit(() -> {
+ final Future<?> sleepFuture =
controllerThreadPool.getRunExecutorService().submit(() -> {
try {
Thread.sleep(3_600_000);
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
index 5fbda3623c4..107d48c371a 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
@@ -25,9 +25,9 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.guice.DartWorkerModule;
+import org.apache.druid.msq.exec.ControllerHolder;
import org.apache.druid.rpc.MockServiceClient;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.sql.http.GetQueriesResponse;
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerHolderTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerHolderTest.java
new file mode 100644
index 00000000000..cea10aa8827
--- /dev/null
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerHolderTest.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
+import org.apache.druid.msq.indexing.error.CancellationReason;
+import org.apache.druid.msq.indexing.error.MSQErrorReport;
+import org.apache.druid.msq.indexing.report.MSQStatusReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.test.NoopQueryListener;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryContexts;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ControllerHolderTest
+{
+ private static final Logger log = new Logger(ControllerHolderTest.class);
+ private ControllerThreadPool controllerThreadPool;
+
+ @Before
+ public void setUp()
+ {
+ controllerThreadPool = new ControllerThreadPool(
+ MoreExecutors.listeningDecorator(Execs.multiThreaded(2,
"controller-holder-test-%s")),
+ Executors.newSingleThreadScheduledExecutor(
+ Execs.makeThreadFactory("controller-holder-test-timeout-%s")
+ )
+ );
+ }
+
+ @After
+ public void tearDown() throws InterruptedException
+ {
+ final ListeningExecutorService exec =
controllerThreadPool.getRunExecutorService();
+ final ScheduledExecutorService scheduledExec =
controllerThreadPool.getTimeoutExecutorService();
+ controllerThreadPool.stop();
+ if (!exec.awaitTermination(5, TimeUnit.MINUTES)) {
+ log.warn("Could not terminate run executor within 5 minutes");
+ }
+ if (!scheduledExec.awaitTermination(5, TimeUnit.MINUTES)) {
+ log.warn("Could not terminate timeout executor within 5 minutes");
+ }
+ }
+
+ @Test
+ public void testInterruptionOfLongRunningController() throws Exception
+ {
+ final CountDownLatch controllerStarted = new CountDownLatch(1);
+ final CountDownLatch controllerFinished = new CountDownLatch(1);
+ final AtomicBoolean wasInterrupted = new AtomicBoolean(false);
+ final Controller controller = new TestController("test-query")
+ {
+ @Override
+ public void run(final QueryListener listener)
+ {
+ try {
+ controllerStarted.countDown();
+ Thread.sleep(300_000);
+ }
+ catch (InterruptedException e) {
+ wasInterrupted.set(true);
+ }
+ finally {
+ listener.onQueryComplete(makeSuccessReport());
+ controllerFinished.countDown();
+ }
+ }
+ };
+
+ final ControllerHolder holder = new ControllerHolder(controller, "sql-1",
null, null, DateTimes.nowUtc());
+ final ListenableFuture<?> future = holder.runAsync(new
NoopQueryListener(), null, controllerThreadPool);
+
+ controllerStarted.await();
+ holder.cancel(CancellationReason.USER_REQUEST);
+ controllerFinished.await();
+
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ }
+ catch (Exception ignored) {
+ // ignore
+ }
+
+ Assert.assertTrue("Controller should have been interrupted",
wasInterrupted.get());
+ Assert.assertEquals(ControllerHolder.State.CANCELED, holder.getState());
+ }
+
+ @Test
+ public void testCancelCallsStop() throws Exception
+ {
+ final CountDownLatch controllerStarted = new CountDownLatch(1);
+ final CountDownLatch controllerFinished = new CountDownLatch(1);
+ final AtomicBoolean stopCalled = new AtomicBoolean(false);
+ final Controller controller = new TestController("test-query")
+ {
+ @Override
+ public void run(final QueryListener listener)
+ {
+ try {
+ controllerStarted.countDown();
+ Thread.sleep(300_000);
+ }
+ catch (InterruptedException e) {
+ // expected
+ }
+ finally {
+ listener.onQueryComplete(makeSuccessReport());
+ controllerFinished.countDown();
+ }
+ }
+
+ @Override
+ public void stop(final CancellationReason reason)
+ {
+ stopCalled.set(true);
+ }
+ };
+
+ final ControllerHolder holder = new ControllerHolder(controller, "sql-1",
null, null, DateTimes.nowUtc());
+ holder.runAsync(new NoopQueryListener(), null, controllerThreadPool);
+
+ controllerStarted.await();
+ holder.cancel(CancellationReason.USER_REQUEST);
+ controllerFinished.await();
+
+ Assert.assertTrue("stop() should have been called as failsafe",
stopCalled.get());
+ }
+
+ @Test
+ public void testSuccessfulCompletion() throws Exception
+ {
+ final Controller controller = new TestController("test-query")
+ {
+ @Override
+ public void run(final QueryListener listener)
+ {
+ listener.onQueryComplete(makeSuccessReport());
+ }
+ };
+
+ final ControllerHolder holder = new ControllerHolder(controller, "sql-1",
null, null, DateTimes.nowUtc());
+ final ListenableFuture<?> future = holder.runAsync(new
NoopQueryListener(), null, controllerThreadPool);
+ future.get(5, TimeUnit.SECONDS);
+
+ Assert.assertEquals(ControllerHolder.State.SUCCESS, holder.getState());
+ }
+
+ @Test
+ public void testCancelBeforeRun() throws Exception
+ {
+ final AtomicBoolean controllerRan = new AtomicBoolean(false);
+ final Controller controller = new TestController("test-query")
+ {
+ @Override
+ public void run(final QueryListener listener)
+ {
+ controllerRan.set(true);
+ listener.onQueryComplete(makeSuccessReport());
+ }
+ };
+
+ final ControllerHolder holder = new ControllerHolder(controller, "sql-1",
null, null, DateTimes.nowUtc());
+
+ // Cancel before run
+ holder.cancel(CancellationReason.USER_REQUEST);
+ Assert.assertEquals(ControllerHolder.State.CANCELED, holder.getState());
+
+ // Run should complete quickly without running the controller
+ final ListenableFuture<?> future = holder.runAsync(new
NoopQueryListener(), null, controllerThreadPool);
+ future.get(5, TimeUnit.SECONDS);
+
+ Assert.assertFalse("Controller should not have run", controllerRan.get());
+ }
+
+ @Test
+ public void testDoubleCancelIsIdempotent() throws Exception
+ {
+ final CountDownLatch controllerStarted = new CountDownLatch(1);
+ final CountDownLatch controllerFinished = new CountDownLatch(1);
+ final Controller controller = new TestController("test-query")
+ {
+ @Override
+ public void run(final QueryListener listener)
+ {
+ try {
+ controllerStarted.countDown();
+ Thread.sleep(300_000);
+ }
+ catch (InterruptedException e) {
+ // expected
+ }
+ finally {
+ listener.onQueryComplete(makeSuccessReport());
+ controllerFinished.countDown();
+ }
+ }
+ };
+
+ final ControllerHolder holder = new ControllerHolder(controller, "sql-1",
null, null, DateTimes.nowUtc());
+ holder.runAsync(new NoopQueryListener(), null, controllerThreadPool);
+
+ controllerStarted.await();
+
+ // Cancel twice — should not throw
+ holder.cancel(CancellationReason.USER_REQUEST);
+ holder.cancel(CancellationReason.USER_REQUEST);
+ controllerFinished.await();
+
+ Assert.assertEquals(ControllerHolder.State.CANCELED, holder.getState());
+ }
+
+ @Test
+ public void testCancelAfterCompletion() throws Exception
+ {
+ final Controller controller = new TestController("test-query")
+ {
+ @Override
+ public void run(final QueryListener listener)
+ {
+ listener.onQueryComplete(makeSuccessReport());
+ }
+ };
+
+ final ControllerHolder holder = new ControllerHolder(controller, "sql-1",
null, null, DateTimes.nowUtc());
+ final ListenableFuture<?> future = holder.runAsync(new
NoopQueryListener(), null, controllerThreadPool);
+ future.get(5, TimeUnit.SECONDS);
+
+ Assert.assertEquals(ControllerHolder.State.SUCCESS, holder.getState());
+
+ // Cancel after completion — should be a no-op
+ holder.cancel(CancellationReason.USER_REQUEST);
+ Assert.assertEquals(ControllerHolder.State.SUCCESS, holder.getState());
+ }
+
+ @Test
+ public void testWithNullRegistry() throws Exception
+ {
+ final Controller controller = new TestController("test-query")
+ {
+ @Override
+ public void run(final QueryListener listener)
+ {
+ listener.onQueryComplete(makeSuccessReport());
+ }
+ };
+
+ final ControllerHolder holder = new ControllerHolder(controller, "id",
null, null, DateTimes.nowUtc());
+ final ControllerThreadPool directPool = new ControllerThreadPool(
+ Execs.directExecutor(),
+ controllerThreadPool.getTimeoutExecutorService()
+ );
+ final ListenableFuture<?> future = holder.runAsync(
+ new NoopQueryListener(),
+ null,
+ directPool
+ );
+ future.get(5, TimeUnit.SECONDS);
+
+ Assert.assertEquals(ControllerHolder.State.SUCCESS, holder.getState());
+ }
+
+ @Test
+ public void testWithRegistry() throws Exception
+ {
+ final AtomicBoolean registered = new AtomicBoolean(false);
+ final AtomicBoolean deregistered = new AtomicBoolean(false);
+ final ControllerRegistry registry = new ControllerRegistry()
+ {
+ @Override
+ public void register(final ControllerHolder holder)
+ {
+ registered.set(true);
+ }
+
+ @Override
+ public void deregister(final ControllerHolder holder, @Nullable final
TaskReport.ReportMap report)
+ {
+ deregistered.set(true);
+ }
+ };
+
+ final Controller controller = new TestController("test-query")
+ {
+ @Override
+ public void run(final QueryListener listener)
+ {
+ listener.onQueryComplete(makeSuccessReport());
+ }
+ };
+
+ final ControllerHolder holder = new ControllerHolder(controller, "sql-1",
null, null, DateTimes.nowUtc());
+ final ListenableFuture<?> future = holder.runAsync(new
NoopQueryListener(), registry, controllerThreadPool);
+ future.get(5, TimeUnit.SECONDS);
+
+ Assert.assertTrue("Should have been registered", registered.get());
+ Assert.assertTrue("Should have been deregistered", deregistered.get());
+ }
+
+ @Test
+ public void testTimeout() throws Exception
+ {
+ final Controller controller = new TestController("test-query")
+ {
+ @Override
+ public QueryContext getQueryContext()
+ {
+ return QueryContext.of(Map.of(QueryContexts.TIMEOUT_KEY, 1));
+ }
+
+ @Override
+ public void run(final QueryListener listener)
+ {
+ try {
+ Thread.sleep(300_000);
+ }
+ catch (InterruptedException ignored) {
+ // expected — canceled due to timeout
+ }
+ finally {
+ listener.onQueryComplete(makeSuccessReport());
+ }
+ }
+ };
+
+ final ControllerHolder holder = new ControllerHolder(controller, "sql-1",
null, null, DateTimes.nowUtc());
+ final ListenableFuture<?> future = holder.runAsync(new
NoopQueryListener(), null, controllerThreadPool);
+ future.get(30, TimeUnit.SECONDS);
+
+ Assert.assertEquals(ControllerHolder.State.CANCELED, holder.getState());
+ }
+
+ private static MSQTaskReportPayload makeSuccessReport()
+ {
+ final MSQStatusReport statusReport =
+ new MSQStatusReport(TaskState.SUCCESS, null, null, null, 0, Map.of(),
0, 0, null, null);
+ return new MSQTaskReportPayload(statusReport, null, null, null);
+ }
+
+ /**
+ * Base class for test controllers that implements all required methods as
no-ops.
+ */
+ private abstract static class TestController implements Controller
+ {
+ private final String queryId;
+
+ TestController(final String queryId)
+ {
+ this.queryId = queryId;
+ }
+
+ @Override
+ public String queryId()
+ {
+ return queryId;
+ }
+
+ @Override
+ public void stop(final CancellationReason reason)
+ {
+ }
+
+ @Override
+ public void resultsComplete(
+ final String queryId,
+ final int stageNumber,
+ final int workerNumber,
+ final Object resultObject
+ )
+ {
+ }
+
+ @Override
+ public void updatePartialKeyStatisticsInformation(
+ final int stageNumber,
+ final int workerNumber,
+ final Object partialKeyStatisticsInformationObject
+ )
+ {
+ }
+
+ @Override
+ public void doneReadingInput(final int stageNumber, final int workerNumber)
+ {
+ }
+
+ @Override
+ public void workerError(final MSQErrorReport errorReport)
+ {
+ }
+
+ @Override
+ public void workerWarning(final java.util.List<MSQErrorReport>
errorReports)
+ {
+ }
+
+ @Override
+ public void updateCounters(
+ final String taskId,
+ final org.apache.druid.msq.counters.CounterSnapshotsTree snapshotsTree
+ )
+ {
+ }
+
+ @Override
+ public boolean hasWorker(final String workerId)
+ {
+ return false;
+ }
+
+ @Override
+ public java.util.List<String> getWorkerIds()
+ {
+ return java.util.List.of();
+ }
+
+ @Override
+ public TaskReport.ReportMap liveReports()
+ {
+ return new TaskReport.ReportMap();
+ }
+
+ @Override
+ public ControllerContext getControllerContext()
+ {
+ return null;
+ }
+
+ @Override
+ public QueryContext getQueryContext()
+ {
+ return QueryContext.empty();
+ }
+ }
+}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
index c93e813ba5e..27381ca6904 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
@@ -40,8 +40,11 @@ import org.apache.druid.indexer.report.TaskReport.ReportMap;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
+import org.apache.druid.msq.exec.ControllerHolder;
import org.apache.druid.msq.exec.ControllerImpl;
import org.apache.druid.msq.exec.QueryListener;
import org.apache.druid.msq.exec.ResultsContext;
@@ -56,6 +59,7 @@ import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.sql.MSQTaskQueryKitSpecFactory;
import org.apache.druid.msq.util.SqlStatementResourceHelper;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
@@ -189,13 +193,29 @@ public class MSQTestOverlordServiceClient extends
NoopOverlordClient
resultsContext
);
+ final ControllerHolder controllerHolder = new ControllerHolder(
+ controller,
+
controller.getQueryContext().getString(QueryContexts.CTX_SQL_QUERY_ID,
controller.queryId()),
+ null,
+ null,
+ DateTimes.nowUtc()
+ );
+
+ final ControllerThreadPool controllerThreadPool = new
ControllerThreadPool(
+ Execs.directExecutor(),
+ Execs.scheduledSingleThreaded("msq-test-controller-timeout-%s")
+ );
+
try {
- controller.run(queryListener);
+ controllerHolder.runAsync(queryListener, null,
controllerThreadPool).get();
testTaskDetails.taskStatus =
queryListener.getStatusReport().toTaskStatus(cTask.getId());
}
catch (Exception e) {
testTaskDetails.taskStatus = TaskStatus.failure(cTask.getId(),
e.toString());
}
+ finally {
+ controllerThreadPool.stop();
+ }
return Futures.immediateFuture(null);
}
catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]