This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 219ea7a50cd fix: Dart missing default timeout in prePlanned mode. 
(#19222)
219ea7a50cd is described below

commit 219ea7a50cd393da3bbc36f902c3213fe419539a
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Mar 30 12:42:18 2026 -0700

    fix: Dart missing default timeout in prePlanned mode. (#19222)
    
    Previously, the pre-planned mode for Dart was missing the logic to
    add the server default timeout, meaning those queries would run with
    no timeout at all unless the user specified one. This patch fixes it
    by moving the timeout setting to initQueryContext, which runs in any
    prePlanned mode.
    
    This patch also reworks common context handling by separating truly
    common context from task-specific context. The truly common context
    now lives in MultiStageQueryContext#withCommonContext. One of the
    things this function does is set the queryDeadline key, to represent
    the time that the query should time out.
---
 .../msq/dart/controller/sql/DartQueryMaker.java    |  22 +---
 .../msq/dart/controller/sql/DartSqlEngine.java     |  37 ++++---
 .../controller/sql/PrePlannedDartQueryMaker.java   |  14 +--
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  22 ++--
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    | 117 ++++++++++-----------
 .../org/apache/druid/msq/sql/MSQTaskSqlEngine.java |   8 +-
 .../druid/msq/util/MultiStageQueryContext.java     |  74 +++++++++++--
 .../druid/msq/sql/MSQTaskQueryMakerTest.java       |   5 +-
 .../org/apache/druid/msq/test/MSQTestBase.java     |   5 +-
 .../druid/msq/util/MultiStageQueryContextTest.java |  39 +++++++
 10 files changed, 209 insertions(+), 134 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index d9e3ad30001..053e5fef592 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -52,7 +52,6 @@ import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.server.QueryResponse;
-import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.sql.calcite.planner.ColumnMappings;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.planner.QueryUtils;
@@ -63,7 +62,6 @@ import org.apache.druid.sql.calcite.run.SqlResults;
 import java.io.ByteArrayOutputStream;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
 
@@ -96,7 +94,6 @@ public class DartQueryMaker implements QueryMaker
    * Executor for running controllers.
    */
   private final ControllerThreadPool controllerThreadPool;
-  private final ServerConfig serverConfig;
 
   final QueryKitSpecFactory queryKitSpecFactory;
   final MultiQueryKit queryKit;
@@ -109,8 +106,7 @@ public class DartQueryMaker implements QueryMaker
       DartControllerConfig controllerConfig,
       ControllerThreadPool controllerThreadPool,
       QueryKitSpecFactory queryKitSpecFactory,
-      MultiQueryKit queryKit,
-      ServerConfig serverConfig
+      MultiQueryKit queryKit
   )
   {
     this.fieldMapping = fieldMapping;
@@ -121,17 +117,15 @@ public class DartQueryMaker implements QueryMaker
     this.controllerThreadPool = controllerThreadPool;
     this.queryKitSpecFactory = queryKitSpecFactory;
     this.queryKit = queryKit;
-    this.serverConfig = serverConfig;
   }
 
   @Override
   public QueryResponse<Object[]> runQuery(DruidQuery druidQuery)
   {
     ColumnMappings columnMappings = 
QueryUtils.buildColumnMappings(fieldMapping, 
druidQuery.getOutputRowSignature());
-    final LegacyMSQSpec querySpec = MSQTaskQueryMaker.makeLegacyMSQSpec(
+    final LegacyMSQSpec querySpec = MSQTaskQueryMaker.buildLegacyMSQSpec(
         null,
         druidQuery,
-        finalizeTimeout(druidQuery.getQuery().context()),
         columnMappings,
         plannerContext,
         null
@@ -207,18 +201,6 @@ public class DartQueryMaker implements QueryMaker
     );
   }
 
-  /**
-   * Adds the timeout parameter to the query context, considering the default 
and maximum values from
-   * {@link ServerConfig}.
-   */
-  private QueryContext finalizeTimeout(QueryContext queryContext)
-  {
-    final long timeout = 
queryContext.getTimeout(serverConfig.getDefaultQueryTimeout());
-    QueryContext timeoutContext = 
queryContext.override(Map.of(QueryContexts.TIMEOUT_KEY, timeout));
-    timeoutContext.verifyMaxQueryTimeout(serverConfig.getMaxQueryTimeout());
-    return timeoutContext;
-  }
-
   /**
    * Runs a controller in {@link #controllerThreadPool} and returns a {@link 
QueryResponse} object.
    *
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
index 82a2479a392..e3f44ae59f2 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
@@ -41,11 +41,13 @@ import 
org.apache.druid.msq.dart.controller.DartControllerRegistry;
 import org.apache.druid.msq.dart.controller.QueryInfoAndReport;
 import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
 import org.apache.druid.msq.dart.guice.DartControllerConfig;
+import org.apache.druid.msq.exec.Controller;
 import org.apache.druid.msq.exec.QueryKitSpecFactory;
 import org.apache.druid.msq.indexing.error.CancellationReason;
 import org.apache.druid.msq.querykit.MultiQueryKit;
 import org.apache.druid.msq.sql.DartQueryKitSpecFactory;
 import org.apache.druid.msq.sql.MSQTaskSqlEngine;
+import org.apache.druid.query.BaseQuery;
 import org.apache.druid.query.QueryConfigProvider;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryContexts;
@@ -53,6 +55,7 @@ import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.AuthorizationResult;
+import org.apache.druid.sql.SqlLifecycleManager;
 import org.apache.druid.sql.SqlStatementFactory;
 import org.apache.druid.sql.SqlToolbox;
 import org.apache.druid.sql.calcite.planner.Calcites;
@@ -117,6 +120,18 @@ public class DartSqlEngine implements SqlEngine
     this.sqlClients = sqlClients;
   }
 
+  /**
+   * Dart queryId must be globally unique, so we cannot use the user-provided 
{@link QueryContexts#CTX_SQL_QUERY_ID}
+   * or {@link BaseQuery#QUERY_ID}. Instead we generate a UUID that becomes 
the {@link Controller#queryId()}.
+   *
+   * The user-provided {@link QueryContexts#CTX_SQL_QUERY_ID} is still 
registered with the {@link SqlLifecycleManager}
+   * for purposes of query cancellation.
+   */
+  public static String generateExecutionId()
+  {
+    return UUID.randomUUID().toString();
+  }
+
   @Override
   public String name()
   {
@@ -156,6 +171,7 @@ public class DartSqlEngine implements SqlEngine
   public void validateContext(Map<String, Object> queryContext)
   {
     SqlEngines.validateNoSpecialContextKeys(queryContext, 
MSQTaskSqlEngine.SYSTEM_CONTEXT_PARAMETERS);
+    
QueryContext.of(queryContext).verifyMaxQueryTimeout(serverConfig.getMaxQueryTimeout());
   }
 
   @Override
@@ -199,8 +215,7 @@ public class DartSqlEngine implements SqlEngine
         controllerConfig,
         controllerThreadPool,
         queryKitSpecFactory,
-        queryKit,
-        serverConfig
+        queryKit
     );
     if (plannerContext.queryContext().isPrePlanned()) {
       return new PrePlannedDartQueryMaker(plannerContext, dartQueryMaker);
@@ -226,18 +241,12 @@ public class DartSqlEngine implements SqlEngine
     for (Map.Entry<String, Object> entry : 
queryConfigProvider.getContext().entrySet()) {
       contextMap.putIfAbsent(entry.getKey(), entry.getValue());
     }
-    /**
-     * Dart queryId must be globally unique, so we cannot use the 
user-provided {@link QueryContexts#CTX_SQL_QUERY_ID}
-     * or {@link BaseQuery#QUERY_ID}. Instead we generate a UUID in {@link 
DartSqlResource#doPost}, overriding whatever
-     * the user may have provided. This becomes the {@link 
Controller#queryId()}.
-     *
-     * The user-provided {@link QueryContexts#CTX_SQL_QUERY_ID} is still 
registered with the {@link SqlLifecycleManager}
-     * for purposes of query cancellation.
-     *
-     * The user-provided {@link BaseQuery#QUERY_ID} is ignored.
-     */
-    final String dartQueryId = UUID.randomUUID().toString();
-    contextMap.put(QueryContexts.CTX_DART_QUERY_ID, dartQueryId);
+
+    // Set default query timeout if not already specified.
+    contextMap.putIfAbsent(QueryContexts.TIMEOUT_KEY, 
serverConfig.getDefaultQueryTimeout());
+
+    // Add execution ID.
+    contextMap.put(QueryContexts.CTX_DART_QUERY_ID, generateExecutionId());
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
index e6cdc9e0bb2..5740142dd63 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
@@ -75,9 +75,7 @@ class PrePlannedDartQueryMaker implements QueryMaker, 
QueryMaker.FromDruidLogica
 
     QueryContext context = plannerContext.queryContext();
     ColumnMappings columnMappings = 
QueryUtils.buildColumnMappings(dartQueryMaker.fieldMapping, 
logicalStage.getLogicalRowSignature());
-    QueryDefMSQSpec querySpec = MSQTaskQueryMaker.makeQueryDefMSQSpec(
-        null,
-        context,
+    QueryDefMSQSpec querySpec = MSQTaskQueryMaker.buildQueryDefMSQSpec(
         columnMappings,
         plannerContext,
         null,
@@ -97,8 +95,7 @@ class PrePlannedDartQueryMaker implements QueryMaker, 
QueryMaker.FromDruidLogica
     QueryContext queryContext = druidQuery.getQuery().context();
     ResultsContext resultsContext = 
DartQueryMaker.makeResultsContext(druidQuery, dartQueryMaker.fieldMapping, 
plannerContext);
     QueryDefMSQSpec msqSpec = buildMSQSpec(druidQuery, 
dartQueryMaker.fieldMapping, queryContext, resultsContext);
-    QueryResponse<Object[]> response = 
dartQueryMaker.runQueryDefMSQSpec(msqSpec, queryContext, resultsContext);
-    return response;
+    return dartQueryMaker.runQueryDefMSQSpec(msqSpec, queryContext, 
resultsContext);
   }
 
   private QueryDefMSQSpec buildMSQSpec(
@@ -108,10 +105,9 @@ class PrePlannedDartQueryMaker implements QueryMaker, 
QueryMaker.FromDruidLogica
       ResultsContext resultsContext)
   {
     ColumnMappings columnMappings = 
QueryUtils.buildColumnMappings(fieldMapping, 
druidQuery.getOutputRowSignature());
-    LegacyMSQSpec querySpec = MSQTaskQueryMaker.makeLegacyMSQSpec(
+    LegacyMSQSpec querySpec = MSQTaskQueryMaker.buildLegacyMSQSpec(
         null,
         druidQuery,
-        druidQuery.getQuery().context(),
         columnMappings,
         plannerContext,
         null
@@ -132,9 +128,7 @@ class PrePlannedDartQueryMaker implements QueryMaker, 
QueryMaker.FromDruidLogica
         )
     ).makeQueryDefinition();
 
-    return MSQTaskQueryMaker.makeQueryDefMSQSpec(
-        null,
-        druidQuery.getQuery().context(),
+    return MSQTaskQueryMaker.buildQueryDefMSQSpec(
         columnMappings,
         plannerContext,
         null,
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 83db73bd754..958589598e3 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -2326,7 +2326,7 @@ public class ControllerImpl implements Controller
       startTaskLauncher();
 
       boolean runAgain;
-      final DateTime queryFailDeadline = 
getQueryDeadline(querySpec.getContext());
+      final DateTime queryFailDeadline = getQueryDeadline();
 
       // The timeout could have already elapsed while waiting for the 
controller to start, check it now.
       checkTimeout(queryFailDeadline);
@@ -2358,17 +2358,21 @@ public class ControllerImpl implements Controller
     }
 
     /**
-     * Retrieves the timeout and start time from the query context and 
calculates the timeout deadline.
+     * Retrieves the timeout and start time from the query context and reads 
or calculates the deadline.
      */
-    private DateTime getQueryDeadline(QueryContext queryContext)
+    private DateTime getQueryDeadline()
     {
-      // Fetch the timeout, but don't use default server configured timeout if 
the user has not specified one.
-      final long timeout = queryContext.getTimeout(QueryContexts.NO_TIMEOUT);
-      // Not using QueryContexts.hasTimeout(), as this considers the default 
timeout as timeout being set.
-      if (timeout == QueryContexts.NO_TIMEOUT) {
-        return DateTimes.MAX;
+      DateTime deadline = 
MultiStageQueryContext.getQueryDeadline(querySpec.getContext());
+
+      if (deadline == null) {
+        // Newer Brokers set the deadline, but older ones might not. Fall back 
to startTime and timeout in this case.
+        final long timeout = 
querySpec.getContext().getTimeout(QueryContexts.NO_TIMEOUT);
+        if (timeout != QueryContexts.NO_TIMEOUT) {
+          deadline = 
MultiStageQueryContext.getStartTime(querySpec.getContext()).plus(timeout);
+        }
       }
-      return MultiStageQueryContext.getStartTime(queryContext).plus(timeout);
+
+      return deadline != null ? deadline : DateTimes.MAX;
     }
 
     /**
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index c8b9546f017..517731a2089 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -20,7 +20,6 @@
 package org.apache.druid.msq.sql;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import org.apache.calcite.rel.type.RelDataType;
@@ -35,8 +34,6 @@ import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.data.input.impl.AggregateProjectionSpec;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InvalidInput;
-import org.apache.druid.frame.FrameType;
-import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
@@ -44,7 +41,6 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.msq.exec.MSQTasks;
-import org.apache.druid.msq.exec.QueryKitSpecFactory;
 import org.apache.druid.msq.exec.ResultsContext;
 import org.apache.druid.msq.indexing.LegacyMSQSpec;
 import org.apache.druid.msq.indexing.MSQControllerTask;
@@ -62,7 +58,6 @@ import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContext;
-import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.IndexSpec;
@@ -108,28 +103,22 @@ public class MSQTaskQueryMaker implements QueryMaker
   private final IngestDestination targetDataSource;
   private final OverlordClient overlordClient;
   private final PlannerContext plannerContext;
-  private final ObjectMapper jsonMapper;
   private final List<Entry<Integer, String>> fieldMapping;
   private final MSQTerminalStageSpecFactory terminalStageSpecFactory;
-  private final QueryKitSpecFactory queryKitSpecFactory;
 
   MSQTaskQueryMaker(
       @Nullable final IngestDestination targetDataSource,
       final OverlordClient overlordClient,
       final PlannerContext plannerContext,
-      final ObjectMapper jsonMapper,
       final List<Entry<Integer, String>> fieldMapping,
-      final MSQTerminalStageSpecFactory terminalStageSpecFactory,
-      final QueryKitSpecFactory queryKitSpecFactory
+      final MSQTerminalStageSpecFactory terminalStageSpecFactory
   )
   {
     this.targetDataSource = targetDataSource;
     this.overlordClient = Preconditions.checkNotNull(overlordClient, 
"indexingServiceClient");
     this.plannerContext = Preconditions.checkNotNull(plannerContext, 
"plannerContext");
-    this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
     this.fieldMapping = Preconditions.checkNotNull(fieldMapping, 
"fieldMapping");
     this.terminalStageSpecFactory = terminalStageSpecFactory;
-    this.queryKitSpecFactory = queryKitSpecFactory;
   }
 
   @Override
@@ -154,15 +143,23 @@ public class MSQTaskQueryMaker implements QueryMaker
     );
     ColumnMappings columnMappings = 
QueryUtils.buildColumnMappings(fieldMapping, 
druidQuery.getOutputRowSignature());
 
-    final LegacyMSQSpec querySpec = makeLegacyMSQSpec(
+    final LegacyMSQSpec baseSpec = buildLegacyMSQSpec(
         targetDataSource,
         druidQuery,
-        druidQuery.getQuery().context(),
         columnMappings,
         plannerContext,
         terminalStageSpecFactory
     );
 
+    final LegacyMSQSpec querySpec = baseSpec.withOverriddenContext(
+        withTaskOverrides(
+            druidQuery.getQuery(),
+            baseSpec.getContext(),
+            plannerContext,
+            baseSpec.getDestination()
+        )
+    );
+
     final MSQControllerTask controllerTask = new MSQControllerTask(
         taskId,
         querySpec,
@@ -178,11 +175,14 @@ public class MSQTaskQueryMaker implements QueryMaker
     return 
QueryResponse.withEmptyContext(Sequences.simple(Collections.singletonList(new 
Object[]{taskId})));
   }
 
-  public static LegacyMSQSpec makeLegacyMSQSpec(
+  /**
+   * Creates a {@link LegacyMSQSpec} based on a {@link Query}. Common context 
from
+   * {@link MultiStageQueryContext#withCommonContext(QueryContext)} is added 
to the {@link Query}.
+   */
+  public static LegacyMSQSpec buildLegacyMSQSpec(
       @Nullable final IngestDestination targetDataSource,
       final DruidQuery druidQuery,
-      final QueryContext queryContext,
-      ColumnMappings columnMappings,
+      final ColumnMappings columnMappings,
       final PlannerContext plannerContext,
       final MSQTerminalStageSpecFactory terminalStageSpecFactory
   )
@@ -194,19 +194,23 @@ public class MSQTaskQueryMaker implements QueryMaker
         terminalStageSpecFactory
     );
 
-    final Map<String, Object> nativeQueryContextOverrides = 
buildOverrideContext(druidQuery.getQuery(), plannerContext, destination);
+    final QueryContext finalContext = 
MultiStageQueryContext.withCommonContext(plannerContext.queryContext());
 
     final LegacyMSQSpec querySpec =
         LegacyMSQSpec.builder()
-               .query(druidQuery.getQuery())
-               
.queryContext(queryContext.override(nativeQueryContextOverrides))
+               
.query(druidQuery.getQuery().withOverriddenContext(finalContext.asMap()))
+               .queryContext(finalContext)
                .columnMappings(columnMappings)
                .destination(destination)
-               
.assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(plannerContext.queryContext()))
+               
.assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(finalContext))
                .tuningConfig(makeMSQTuningConfig(plannerContext))
                .build();
 
-    MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec.getContext(), 
querySpec.getDestination(), druidQuery.getQuery());
+    MSQTaskQueryMakerUtils.validateRealtimeReindex(
+        querySpec.getContext(),
+        querySpec.getDestination(),
+        druidQuery.getQuery()
+    );
 
     return querySpec;
   }
@@ -255,50 +259,42 @@ public class MSQTaskQueryMaker implements QueryMaker
     return destination;
   }
 
-  private static Map<String, Object> buildOverrideContext(
+  /**
+   * Returns a combined context map: starts with {@code baseContext}, adds 
mode defaults (which do not override
+   * existing keys), then adds task-specific overrides that are not shared 
with other MSQ engines like Dart.
+   */
+  private static Map<String, Object> withTaskOverrides(
       final Query<?> query,
+      final QueryContext baseContext,
       final PlannerContext plannerContext,
-      final MSQDestination destination)
+      final MSQDestination destination
+  )
   {
-    final QueryContext sqlQueryContext = plannerContext.queryContext();
-    final Map<String, Object> nativeQueryContextOverrides = new HashMap<>();
+    final Map<String, Object> context = new HashMap<>(baseContext.asMap());
 
-    // Add appropriate finalization to native query context.
-    final boolean finalizeAggregations = 
MultiStageQueryContext.isFinalizeAggregations(sqlQueryContext);
-    nativeQueryContextOverrides.put(QueryContexts.FINALIZE_KEY, 
finalizeAggregations);
-
-    // This flag is to ensure backward compatibility, as brokers are upgraded 
after indexers/middlemanagers.
-    
nativeQueryContextOverrides.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION,
 true);
-    boolean isReindex = MSQControllerTask.isReplaceInputDataSourceTask(query, 
destination);
-    if (isReindex) {
-      nativeQueryContextOverrides.put(MultiStageQueryContext.CTX_IS_REINDEX, 
isReindex);
-    }
-    nativeQueryContextOverrides.putAll(sqlQueryContext.asMap());
-
-    // adding user
-    nativeQueryContextOverrides.put(USER_KEY, 
plannerContext.getAuthenticationResult().getIdentity());
-
-    final String msqMode = MultiStageQueryContext.getMSQMode(sqlQueryContext);
+    // Add mode defaults (putIfAbsent, so user-provided values like 
maxParseExceptions take precedence).
+    final String msqMode = MultiStageQueryContext.getMSQMode(baseContext);
     if (msqMode != null) {
-      MSQMode.populateDefaultQueryContext(msqMode, 
nativeQueryContextOverrides);
+      MSQMode.populateDefaultQueryContext(msqMode, context);
     }
 
-    // Use the latest row-based frame type. The default is an older type, to 
ensure compatibility during rolling
-    // updates. Since the Broker is updated last, it's safe to set this 
property on the Broker.
-    nativeQueryContextOverrides.putIfAbsent(
-        MultiStageQueryContext.CTX_ROW_BASED_FRAME_TYPE,
-        (int) FrameType.latestRowBased().version()
-    );
+    // Add task-specific overrides.
+    if (MSQControllerTask.isReplaceInputDataSourceTask(query, destination)) {
+      context.put(MultiStageQueryContext.CTX_IS_REINDEX, true);
+    }
 
-    // Add the start time.
-    nativeQueryContextOverrides.put(MultiStageQueryContext.CTX_START_TIME, 
DateTimes.nowUtc().toString());
+    context.put(USER_KEY, 
plannerContext.getAuthenticationResult().getIdentity());
 
-    return nativeQueryContextOverrides;
+    return context;
   }
 
-  public static QueryDefMSQSpec makeQueryDefMSQSpec(
-      @Nullable final IngestDestination targetDataSource,
-      final QueryContext queryContext,
+  /**
+   * Creates a {@link QueryDefMSQSpec} based on already-planned {@link 
QueryDefinition}. Common context
+   * from {@link MultiStageQueryContext#withCommonContext(QueryContext)} is 
added to the {@link QueryDefinition}.
+   *
+   * Currently only supports SELECT queries, no DML (INSERT/REPLACE).
+   */
+  public static QueryDefMSQSpec buildQueryDefMSQSpec(
       final ColumnMappings columnMappings,
       final PlannerContext plannerContext,
       final MSQTerminalStageSpecFactory terminalStageSpecFactory,
@@ -306,21 +302,20 @@ public class MSQTaskQueryMaker implements QueryMaker
   )
   {
     final MSQDestination destination = buildMSQDestination(
-        targetDataSource,
+        null, // targetDataSource
         columnMappings,
         plannerContext,
         terminalStageSpecFactory
     );
 
-    final QueryDefMSQSpec querySpec = new QueryDefMSQSpec.Builder()
+    final QueryContext finalContext = 
MultiStageQueryContext.withCommonContext(plannerContext.queryContext());
+    return new QueryDefMSQSpec.Builder()
         .columnMappings(columnMappings)
         .destination(destination)
-        
.assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(plannerContext.queryContext()))
+        
.assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(finalContext))
         .tuningConfig(makeMSQTuningConfig(plannerContext))
-        .queryDef(queryDef.withOverriddenContext(buildOverrideContext(null, 
plannerContext, destination)))
+        .queryDef(queryDef.withOverriddenContext(finalContext.asMap()))
         .build();
-
-    return querySpec;
   }
 
   /**
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index 9e96eb7dd7d..2c05fdd606c 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -186,10 +186,8 @@ public class MSQTaskSqlEngine implements SqlEngine
         null,
         overlordClient,
         plannerContext,
-        jsonMapper,
         relRoot.fields,
-        terminalStageSpecFactory,
-        queryKitSpecFactory
+        terminalStageSpecFactory
     );
   }
 
@@ -221,10 +219,8 @@ public class MSQTaskSqlEngine implements SqlEngine
         destination,
         overlordClient,
         plannerContext,
-        jsonMapper,
         relRoot.fields,
-        terminalStageSpecFactory,
-        queryKitSpecFactory
+        terminalStageSpecFactory
     );
   }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index df8fd3da310..79b1c5ca341 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -48,7 +48,6 @@ import org.apache.druid.msq.querykit.ReadableInputQueue;
 import org.apache.druid.msq.rpc.ControllerResource;
 import org.apache.druid.msq.rpc.SketchEncoding;
 import org.apache.druid.msq.sql.MSQMode;
-import org.apache.druid.msq.sql.MSQTaskQueryMaker;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.segment.IndexSpec;
@@ -58,8 +57,10 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -200,6 +201,11 @@ public class MultiStageQueryContext
 
   public static final String CTX_START_TIME = "startTime";
 
+  /**
+   * The time that a query should time out. Value is an ISO8601 timestamp.
+   */
+  public static final String CTX_QUERY_DEADLINE = "queryDeadline";
+
   /**
    * Controls sort order within segments. Normally, this is the same as the 
overall order of the query (from the
    * CLUSTERED BY clause) but it can be overridden.
@@ -239,7 +245,7 @@ public class MultiStageQueryContext
   /**
    * The {@link FrameType} to use for row-based frames. This context parameter 
exists to support rolling updates from
    * older Druid versions. The latest type is given by {@link 
FrameType#latestRowBased()}, which is set in
-   * {@link MSQTaskQueryMaker#buildOverrideContext} starting in Druid 34. Once 
all servers are on Druid 34 or newer,
+   * {@link MultiStageQueryContext#withCommonContext} starting in Druid 34. 
Once all servers are on Druid 34 or newer,
    * the current-latest type {@link FrameType#ROW_BASED_V2} is used.
    */
   public static final String CTX_ROW_BASED_FRAME_TYPE = "rowBasedFrameType";
@@ -590,14 +596,64 @@ public class MultiStageQueryContext
   public static DateTime getStartTime(final QueryContext queryContext)
   {
     // Get the start time from the query context set by the broker.
-    if (!queryContext.containsKey(CTX_START_TIME)) {
-      // If it is missing, as could be the case for an older version of the 
broker, use the current time instead, to
-      // have something to timeout against.
-      DateTime startTime = DateTimes.nowUtc();
-      log.warn("Query context does not contain start time. Defaulting to the 
current time[%s] instead.", startTime);
-      return startTime;
+    final String startTime = queryContext.getString(CTX_START_TIME);
+    if (startTime != null) {
+      return DateTimes.of(startTime);
+    } else {
+      // If it is missing, as could be the case for an older version of the 
broker, use the current time instead.
+      final DateTime now = DateTimes.nowUtc();
+      log.warn("Query context does not contain start time. Defaulting to the 
current time[%s] instead.", now);
+      return now;
+    }
+  }
+
+  @Nullable
+  public static DateTime getQueryDeadline(final QueryContext queryContext)
+  {
+    final String queryDeadline = queryContext.getString(CTX_QUERY_DEADLINE);
+    if (queryDeadline != null) {
+      return DateTimes.of(queryDeadline);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Returns a final query context including common context keys shared across 
all MSQ engines (task, Dart, etc.).
+   */
+  public static QueryContext withCommonContext(final QueryContext 
originalContext)
+  {
+    final Map<String, Object> overrides = new HashMap<>();
+
+    // Add appropriate finalization to native query context.
+    if (!originalContext.containsKey(QueryContexts.FINALIZE_KEY)) {
+      overrides.put(QueryContexts.FINALIZE_KEY, 
isFinalizeAggregations(originalContext));
     }
-    return DateTimes.of(queryContext.getString(CTX_START_TIME));
+
+    // This flag is to ensure backward compatibility, as brokers are upgraded 
after indexers/middlemanagers.
+    if (!originalContext.containsKey(WINDOW_FUNCTION_OPERATOR_TRANSFORMATION)) 
{
+      overrides.put(WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true);
+    }
+
+    if (!originalContext.containsKey(CTX_ROW_BASED_FRAME_TYPE)) {
+      // Use the latest row-based frame type. The default is an older type, to 
ensure compatibility during rolling
+      // updates. Since the Broker is updated last, it's safe to set this 
property on the Broker.
+      overrides.put(CTX_ROW_BASED_FRAME_TYPE, (int) 
FrameType.latestRowBased().version());
+    }
+
+    // Add start time.
+    final DateTime now = DateTimes.nowUtc();
+    overrides.put(CTX_START_TIME, now.toString());
+
+    // Add query deadline if not already present (and if timeout is set).
+    if (!originalContext.containsKey(CTX_QUERY_DEADLINE)) {
+      final long timeout = 
originalContext.getTimeout(QueryContexts.NO_TIMEOUT);
+      if (timeout != QueryContexts.NO_TIMEOUT) {
+        overrides.put(CTX_QUERY_DEADLINE, now.plus(timeout).toString());
+      }
+    }
+
+    return originalContext.override(overrides);
   }
 
   public static Set<String> getColumnsExcludedFromTypeVerification(final 
QueryContext queryContext)
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
index c604590851e..41956d55695 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
@@ -56,7 +56,6 @@ import 
org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.msq.test.MSQTestBase;
 import org.apache.druid.msq.test.MSQTestOverlordServiceClient;
 import org.apache.druid.msq.test.MSQTestTaskActionClient;
-import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.InlineDataSource;
@@ -680,10 +679,8 @@ public class MSQTaskQueryMakerTest
         ingestDestination,
         fakeOverlordClient,
         plannerContextMock,
-        objectMapper,
         fieldMapping,
-        terminalStageSpecFactory,
-        new MSQTaskQueryKitSpecFactory(new DruidProcessingConfig())
+        terminalStageSpecFactory
     );
   }
 }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 5e50f1dcab6..5a2ecbcf243 100644
--- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -363,7 +363,10 @@ public class MSQTestBase extends BaseCalciteQueryTest
   protected final WorkerMemoryParameters workerMemoryParameters = 
Mockito.spy(makeTestWorkerMemoryParameters());
   protected static final String TEST_CONTROLLER_TASK_ID = "query-test-query";
   // Fields in the query context to ignore during assertion.
-  protected Set<String> ignoreFields = 
Set.of(MultiStageQueryContext.CTX_START_TIME);
+  protected Set<String> ignoreFields = Set.of(
+      MultiStageQueryContext.CTX_START_TIME,
+      MultiStageQueryContext.CTX_QUERY_DEADLINE
+  );
 
   protected static class MSQBaseComponentSupplier extends 
StandardComponentSupplier
   {
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index 953d7200500..6db3597d632 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.exec.WorkerMemoryParameters;
 import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
@@ -36,6 +37,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.column.StringEncodingStrategy;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
+import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
@@ -468,6 +470,43 @@ public class MultiStageQueryContextTest
     Assert.assertEquals(Integer.valueOf(4), 
MultiStageQueryContext.getMaxThreads(QueryContext.of(propertyMap)));
   }
 
+  @Test
+  public void withCommonContext_noTimeout_setsStartTimeOnly()
+  {
+    final QueryContext context = 
MultiStageQueryContext.withCommonContext(QueryContext.empty());
+    
Assert.assertTrue(context.containsKey(MultiStageQueryContext.CTX_START_TIME));
+    
Assert.assertFalse(context.containsKey(MultiStageQueryContext.CTX_QUERY_DEADLINE));
+    Assert.assertEquals(true, context.get(QueryContexts.FINALIZE_KEY));
+    Assert.assertEquals(true, 
context.get(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION));
+    
Assert.assertTrue(context.containsKey(MultiStageQueryContext.CTX_ROW_BASED_FRAME_TYPE));
+  }
+
+  @Test
+  public void withCommonContext_withTimeout_setsDeadline()
+  {
+    final long timeoutMs = 60_000;
+    final QueryContext context = MultiStageQueryContext.withCommonContext(
+        QueryContext.of(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, timeoutMs))
+    );
+
+    
Assert.assertTrue(context.containsKey(MultiStageQueryContext.CTX_START_TIME));
+    
Assert.assertTrue(context.containsKey(MultiStageQueryContext.CTX_QUERY_DEADLINE));
+
+    final DateTime startTime = DateTimes.of((String) 
context.get(MultiStageQueryContext.CTX_START_TIME));
+    final DateTime deadline = DateTimes.of((String) 
context.get(MultiStageQueryContext.CTX_QUERY_DEADLINE));
+    Assert.assertEquals(timeoutMs, deadline.getMillis() - 
startTime.getMillis());
+  }
+
+  @Test
+  public void withCommonContext_mergesUserContext()
+  {
+    final QueryContext context = MultiStageQueryContext.withCommonContext(
+        QueryContext.of(ImmutableMap.of("customKey", "customValue"))
+    );
+
+    Assert.assertEquals("customValue", context.get("customKey"));
+  }
+
   private static List<String> decodeSortOrder(@Nullable final String input)
   {
     return 
MultiStageQueryContext.decodeList(MultiStageQueryContext.CTX_SORT_ORDER, input);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to