This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 219ea7a50cd fix: Dart missing default timeout in prePlanned mode.
(#19222)
219ea7a50cd is described below
commit 219ea7a50cd393da3bbc36f902c3213fe419539a
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Mar 30 12:42:18 2026 -0700
fix: Dart missing default timeout in prePlanned mode. (#19222)
Previously, the pre-planned mode for Dart was missing the logic to
add the server default timeout, meaning those queries would run with
no timeout at all unless the user specified one. This patch fixes it
by moving the timeout setting to initQueryContext, which runs in any
prePlanned mode.
This patch also reworks common context handling by separating truly
common context from task-specific context. The truly common context
now lives in MultiStageQueryContext#withCommonContext. One of the
things this function does is set the queryDeadline key, to represent
the time that the query should time out.
---
.../msq/dart/controller/sql/DartQueryMaker.java | 22 +---
.../msq/dart/controller/sql/DartSqlEngine.java | 37 ++++---
.../controller/sql/PrePlannedDartQueryMaker.java | 14 +--
.../org/apache/druid/msq/exec/ControllerImpl.java | 22 ++--
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 117 ++++++++++-----------
.../org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 8 +-
.../druid/msq/util/MultiStageQueryContext.java | 74 +++++++++++--
.../druid/msq/sql/MSQTaskQueryMakerTest.java | 5 +-
.../org/apache/druid/msq/test/MSQTestBase.java | 5 +-
.../druid/msq/util/MultiStageQueryContextTest.java | 39 +++++++
10 files changed, 209 insertions(+), 134 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index d9e3ad30001..053e5fef592 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -52,7 +52,6 @@ import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
-import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.QueryUtils;
@@ -63,7 +62,6 @@ import org.apache.druid.sql.calcite.run.SqlResults;
import java.io.ByteArrayOutputStream;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
@@ -96,7 +94,6 @@ public class DartQueryMaker implements QueryMaker
* Executor for running controllers.
*/
private final ControllerThreadPool controllerThreadPool;
- private final ServerConfig serverConfig;
final QueryKitSpecFactory queryKitSpecFactory;
final MultiQueryKit queryKit;
@@ -109,8 +106,7 @@ public class DartQueryMaker implements QueryMaker
DartControllerConfig controllerConfig,
ControllerThreadPool controllerThreadPool,
QueryKitSpecFactory queryKitSpecFactory,
- MultiQueryKit queryKit,
- ServerConfig serverConfig
+ MultiQueryKit queryKit
)
{
this.fieldMapping = fieldMapping;
@@ -121,17 +117,15 @@ public class DartQueryMaker implements QueryMaker
this.controllerThreadPool = controllerThreadPool;
this.queryKitSpecFactory = queryKitSpecFactory;
this.queryKit = queryKit;
- this.serverConfig = serverConfig;
}
@Override
public QueryResponse<Object[]> runQuery(DruidQuery druidQuery)
{
ColumnMappings columnMappings =
QueryUtils.buildColumnMappings(fieldMapping,
druidQuery.getOutputRowSignature());
- final LegacyMSQSpec querySpec = MSQTaskQueryMaker.makeLegacyMSQSpec(
+ final LegacyMSQSpec querySpec = MSQTaskQueryMaker.buildLegacyMSQSpec(
null,
druidQuery,
- finalizeTimeout(druidQuery.getQuery().context()),
columnMappings,
plannerContext,
null
@@ -207,18 +201,6 @@ public class DartQueryMaker implements QueryMaker
);
}
- /**
- * Adds the timeout parameter to the query context, considering the default
and maximum values from
- * {@link ServerConfig}.
- */
- private QueryContext finalizeTimeout(QueryContext queryContext)
- {
- final long timeout =
queryContext.getTimeout(serverConfig.getDefaultQueryTimeout());
- QueryContext timeoutContext =
queryContext.override(Map.of(QueryContexts.TIMEOUT_KEY, timeout));
- timeoutContext.verifyMaxQueryTimeout(serverConfig.getMaxQueryTimeout());
- return timeoutContext;
- }
-
/**
* Runs a controller in {@link #controllerThreadPool} and returns a {@link
QueryResponse} object.
*
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
index 82a2479a392..e3f44ae59f2 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
@@ -41,11 +41,13 @@ import
org.apache.druid.msq.dart.controller.DartControllerRegistry;
import org.apache.druid.msq.dart.controller.QueryInfoAndReport;
import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.guice.DartControllerConfig;
+import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.QueryKitSpecFactory;
import org.apache.druid.msq.indexing.error.CancellationReason;
import org.apache.druid.msq.querykit.MultiQueryKit;
import org.apache.druid.msq.sql.DartQueryKitSpecFactory;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
+import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.QueryConfigProvider;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
@@ -53,6 +55,7 @@ import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizationResult;
+import org.apache.druid.sql.SqlLifecycleManager;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.planner.Calcites;
@@ -117,6 +120,18 @@ public class DartSqlEngine implements SqlEngine
this.sqlClients = sqlClients;
}
+ /**
+ * Dart queryId must be globally unique, so we cannot use the user-provided
{@link QueryContexts#CTX_SQL_QUERY_ID}
+ * or {@link BaseQuery#QUERY_ID}. Instead we generate a UUID that becomes
the {@link Controller#queryId()}.
+ *
+ * The user-provided {@link QueryContexts#CTX_SQL_QUERY_ID} is still
registered with the {@link SqlLifecycleManager}
+ * for purposes of query cancellation.
+ */
+ public static String generateExecutionId()
+ {
+ return UUID.randomUUID().toString();
+ }
+
@Override
public String name()
{
@@ -156,6 +171,7 @@ public class DartSqlEngine implements SqlEngine
public void validateContext(Map<String, Object> queryContext)
{
SqlEngines.validateNoSpecialContextKeys(queryContext,
MSQTaskSqlEngine.SYSTEM_CONTEXT_PARAMETERS);
+
QueryContext.of(queryContext).verifyMaxQueryTimeout(serverConfig.getMaxQueryTimeout());
}
@Override
@@ -199,8 +215,7 @@ public class DartSqlEngine implements SqlEngine
controllerConfig,
controllerThreadPool,
queryKitSpecFactory,
- queryKit,
- serverConfig
+ queryKit
);
if (plannerContext.queryContext().isPrePlanned()) {
return new PrePlannedDartQueryMaker(plannerContext, dartQueryMaker);
@@ -226,18 +241,12 @@ public class DartSqlEngine implements SqlEngine
for (Map.Entry<String, Object> entry :
queryConfigProvider.getContext().entrySet()) {
contextMap.putIfAbsent(entry.getKey(), entry.getValue());
}
- /**
- * Dart queryId must be globally unique, so we cannot use the
user-provided {@link QueryContexts#CTX_SQL_QUERY_ID}
- * or {@link BaseQuery#QUERY_ID}. Instead we generate a UUID in {@link
DartSqlResource#doPost}, overriding whatever
- * the user may have provided. This becomes the {@link
Controller#queryId()}.
- *
- * The user-provided {@link QueryContexts#CTX_SQL_QUERY_ID} is still
registered with the {@link SqlLifecycleManager}
- * for purposes of query cancellation.
- *
- * The user-provided {@link BaseQuery#QUERY_ID} is ignored.
- */
- final String dartQueryId = UUID.randomUUID().toString();
- contextMap.put(QueryContexts.CTX_DART_QUERY_ID, dartQueryId);
+
+ // Set default query timeout if not already specified.
+ contextMap.putIfAbsent(QueryContexts.TIMEOUT_KEY,
serverConfig.getDefaultQueryTimeout());
+
+ // Add execution ID.
+ contextMap.put(QueryContexts.CTX_DART_QUERY_ID, generateExecutionId());
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
index e6cdc9e0bb2..5740142dd63 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
@@ -75,9 +75,7 @@ class PrePlannedDartQueryMaker implements QueryMaker,
QueryMaker.FromDruidLogica
QueryContext context = plannerContext.queryContext();
ColumnMappings columnMappings =
QueryUtils.buildColumnMappings(dartQueryMaker.fieldMapping,
logicalStage.getLogicalRowSignature());
- QueryDefMSQSpec querySpec = MSQTaskQueryMaker.makeQueryDefMSQSpec(
- null,
- context,
+ QueryDefMSQSpec querySpec = MSQTaskQueryMaker.buildQueryDefMSQSpec(
columnMappings,
plannerContext,
null,
@@ -97,8 +95,7 @@ class PrePlannedDartQueryMaker implements QueryMaker,
QueryMaker.FromDruidLogica
QueryContext queryContext = druidQuery.getQuery().context();
ResultsContext resultsContext =
DartQueryMaker.makeResultsContext(druidQuery, dartQueryMaker.fieldMapping,
plannerContext);
QueryDefMSQSpec msqSpec = buildMSQSpec(druidQuery,
dartQueryMaker.fieldMapping, queryContext, resultsContext);
- QueryResponse<Object[]> response =
dartQueryMaker.runQueryDefMSQSpec(msqSpec, queryContext, resultsContext);
- return response;
+ return dartQueryMaker.runQueryDefMSQSpec(msqSpec, queryContext,
resultsContext);
}
private QueryDefMSQSpec buildMSQSpec(
@@ -108,10 +105,9 @@ class PrePlannedDartQueryMaker implements QueryMaker,
QueryMaker.FromDruidLogica
ResultsContext resultsContext)
{
ColumnMappings columnMappings =
QueryUtils.buildColumnMappings(fieldMapping,
druidQuery.getOutputRowSignature());
- LegacyMSQSpec querySpec = MSQTaskQueryMaker.makeLegacyMSQSpec(
+ LegacyMSQSpec querySpec = MSQTaskQueryMaker.buildLegacyMSQSpec(
null,
druidQuery,
- druidQuery.getQuery().context(),
columnMappings,
plannerContext,
null
@@ -132,9 +128,7 @@ class PrePlannedDartQueryMaker implements QueryMaker,
QueryMaker.FromDruidLogica
)
).makeQueryDefinition();
- return MSQTaskQueryMaker.makeQueryDefMSQSpec(
- null,
- druidQuery.getQuery().context(),
+ return MSQTaskQueryMaker.buildQueryDefMSQSpec(
columnMappings,
plannerContext,
null,
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 83db73bd754..958589598e3 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -2326,7 +2326,7 @@ public class ControllerImpl implements Controller
startTaskLauncher();
boolean runAgain;
- final DateTime queryFailDeadline =
getQueryDeadline(querySpec.getContext());
+ final DateTime queryFailDeadline = getQueryDeadline();
// The timeout could have already elapsed while waiting for the
controller to start, check it now.
checkTimeout(queryFailDeadline);
@@ -2358,17 +2358,21 @@ public class ControllerImpl implements Controller
}
/**
- * Retrieves the timeout and start time from the query context and
calculates the timeout deadline.
+ * Retrieves the timeout and start time from the query context and reads
or calculates the deadline.
*/
- private DateTime getQueryDeadline(QueryContext queryContext)
+ private DateTime getQueryDeadline()
{
- // Fetch the timeout, but don't use default server configured timeout if
the user has not specified one.
- final long timeout = queryContext.getTimeout(QueryContexts.NO_TIMEOUT);
- // Not using QueryContexts.hasTimeout(), as this considers the default
timeout as timeout being set.
- if (timeout == QueryContexts.NO_TIMEOUT) {
- return DateTimes.MAX;
+ DateTime deadline =
MultiStageQueryContext.getQueryDeadline(querySpec.getContext());
+
+ if (deadline == null) {
+ // Newer Brokers set the deadline, but older ones might not. Fall back
to startTime and timeout in this case.
+ final long timeout =
querySpec.getContext().getTimeout(QueryContexts.NO_TIMEOUT);
+ if (timeout != QueryContexts.NO_TIMEOUT) {
+ deadline =
MultiStageQueryContext.getStartTime(querySpec.getContext()).plus(timeout);
+ }
}
- return MultiStageQueryContext.getStartTime(queryContext).plus(timeout);
+
+ return deadline != null ? deadline : DateTimes.MAX;
}
/**
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index c8b9546f017..517731a2089 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -20,7 +20,6 @@
package org.apache.druid.msq.sql;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.calcite.rel.type.RelDataType;
@@ -35,8 +34,6 @@ import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
-import org.apache.druid.frame.FrameType;
-import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
@@ -44,7 +41,6 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.exec.MSQTasks;
-import org.apache.druid.msq.exec.QueryKitSpecFactory;
import org.apache.druid.msq.exec.ResultsContext;
import org.apache.druid.msq.indexing.LegacyMSQSpec;
import org.apache.druid.msq.indexing.MSQControllerTask;
@@ -62,7 +58,6 @@ import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
-import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.IndexSpec;
@@ -108,28 +103,22 @@ public class MSQTaskQueryMaker implements QueryMaker
private final IngestDestination targetDataSource;
private final OverlordClient overlordClient;
private final PlannerContext plannerContext;
- private final ObjectMapper jsonMapper;
private final List<Entry<Integer, String>> fieldMapping;
private final MSQTerminalStageSpecFactory terminalStageSpecFactory;
- private final QueryKitSpecFactory queryKitSpecFactory;
MSQTaskQueryMaker(
@Nullable final IngestDestination targetDataSource,
final OverlordClient overlordClient,
final PlannerContext plannerContext,
- final ObjectMapper jsonMapper,
final List<Entry<Integer, String>> fieldMapping,
- final MSQTerminalStageSpecFactory terminalStageSpecFactory,
- final QueryKitSpecFactory queryKitSpecFactory
+ final MSQTerminalStageSpecFactory terminalStageSpecFactory
)
{
this.targetDataSource = targetDataSource;
this.overlordClient = Preconditions.checkNotNull(overlordClient,
"indexingServiceClient");
this.plannerContext = Preconditions.checkNotNull(plannerContext,
"plannerContext");
- this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
this.fieldMapping = Preconditions.checkNotNull(fieldMapping,
"fieldMapping");
this.terminalStageSpecFactory = terminalStageSpecFactory;
- this.queryKitSpecFactory = queryKitSpecFactory;
}
@Override
@@ -154,15 +143,23 @@ public class MSQTaskQueryMaker implements QueryMaker
);
ColumnMappings columnMappings =
QueryUtils.buildColumnMappings(fieldMapping,
druidQuery.getOutputRowSignature());
- final LegacyMSQSpec querySpec = makeLegacyMSQSpec(
+ final LegacyMSQSpec baseSpec = buildLegacyMSQSpec(
targetDataSource,
druidQuery,
- druidQuery.getQuery().context(),
columnMappings,
plannerContext,
terminalStageSpecFactory
);
+ final LegacyMSQSpec querySpec = baseSpec.withOverriddenContext(
+ withTaskOverrides(
+ druidQuery.getQuery(),
+ baseSpec.getContext(),
+ plannerContext,
+ baseSpec.getDestination()
+ )
+ );
+
final MSQControllerTask controllerTask = new MSQControllerTask(
taskId,
querySpec,
@@ -178,11 +175,14 @@ public class MSQTaskQueryMaker implements QueryMaker
return
QueryResponse.withEmptyContext(Sequences.simple(Collections.singletonList(new
Object[]{taskId})));
}
- public static LegacyMSQSpec makeLegacyMSQSpec(
+ /**
+ * Creates a {@link LegacyMSQSpec} based on a {@link Query}. Common context
from
+ * {@link MultiStageQueryContext#withCommonContext(QueryContext)} is added
to the {@link Query}.
+ */
+ public static LegacyMSQSpec buildLegacyMSQSpec(
@Nullable final IngestDestination targetDataSource,
final DruidQuery druidQuery,
- final QueryContext queryContext,
- ColumnMappings columnMappings,
+ final ColumnMappings columnMappings,
final PlannerContext plannerContext,
final MSQTerminalStageSpecFactory terminalStageSpecFactory
)
@@ -194,19 +194,23 @@ public class MSQTaskQueryMaker implements QueryMaker
terminalStageSpecFactory
);
- final Map<String, Object> nativeQueryContextOverrides =
buildOverrideContext(druidQuery.getQuery(), plannerContext, destination);
+ final QueryContext finalContext =
MultiStageQueryContext.withCommonContext(plannerContext.queryContext());
final LegacyMSQSpec querySpec =
LegacyMSQSpec.builder()
- .query(druidQuery.getQuery())
-
.queryContext(queryContext.override(nativeQueryContextOverrides))
+
.query(druidQuery.getQuery().withOverriddenContext(finalContext.asMap()))
+ .queryContext(finalContext)
.columnMappings(columnMappings)
.destination(destination)
-
.assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(plannerContext.queryContext()))
+
.assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(finalContext))
.tuningConfig(makeMSQTuningConfig(plannerContext))
.build();
- MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec.getContext(),
querySpec.getDestination(), druidQuery.getQuery());
+ MSQTaskQueryMakerUtils.validateRealtimeReindex(
+ querySpec.getContext(),
+ querySpec.getDestination(),
+ druidQuery.getQuery()
+ );
return querySpec;
}
@@ -255,50 +259,42 @@ public class MSQTaskQueryMaker implements QueryMaker
return destination;
}
- private static Map<String, Object> buildOverrideContext(
+ /**
+ * Returns a combined context map: starts with {@code baseContext}, adds
mode defaults (which do not override
+ * existing keys), then adds task-specific overrides that are not shared
with other MSQ engines like Dart.
+ */
+ private static Map<String, Object> withTaskOverrides(
final Query<?> query,
+ final QueryContext baseContext,
final PlannerContext plannerContext,
- final MSQDestination destination)
+ final MSQDestination destination
+ )
{
- final QueryContext sqlQueryContext = plannerContext.queryContext();
- final Map<String, Object> nativeQueryContextOverrides = new HashMap<>();
+ final Map<String, Object> context = new HashMap<>(baseContext.asMap());
- // Add appropriate finalization to native query context.
- final boolean finalizeAggregations =
MultiStageQueryContext.isFinalizeAggregations(sqlQueryContext);
- nativeQueryContextOverrides.put(QueryContexts.FINALIZE_KEY,
finalizeAggregations);
-
- // This flag is to ensure backward compatibility, as brokers are upgraded
after indexers/middlemanagers.
-
nativeQueryContextOverrides.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION,
true);
- boolean isReindex = MSQControllerTask.isReplaceInputDataSourceTask(query,
destination);
- if (isReindex) {
- nativeQueryContextOverrides.put(MultiStageQueryContext.CTX_IS_REINDEX,
isReindex);
- }
- nativeQueryContextOverrides.putAll(sqlQueryContext.asMap());
-
- // adding user
- nativeQueryContextOverrides.put(USER_KEY,
plannerContext.getAuthenticationResult().getIdentity());
-
- final String msqMode = MultiStageQueryContext.getMSQMode(sqlQueryContext);
+ // Add mode defaults (putIfAbsent, so user-provided values like
maxParseExceptions take precedence).
+ final String msqMode = MultiStageQueryContext.getMSQMode(baseContext);
if (msqMode != null) {
- MSQMode.populateDefaultQueryContext(msqMode,
nativeQueryContextOverrides);
+ MSQMode.populateDefaultQueryContext(msqMode, context);
}
- // Use the latest row-based frame type. The default is an older type, to
ensure compatibility during rolling
- // updates. Since the Broker is updated last, it's safe to set this
property on the Broker.
- nativeQueryContextOverrides.putIfAbsent(
- MultiStageQueryContext.CTX_ROW_BASED_FRAME_TYPE,
- (int) FrameType.latestRowBased().version()
- );
+ // Add task-specific overrides.
+ if (MSQControllerTask.isReplaceInputDataSourceTask(query, destination)) {
+ context.put(MultiStageQueryContext.CTX_IS_REINDEX, true);
+ }
- // Add the start time.
- nativeQueryContextOverrides.put(MultiStageQueryContext.CTX_START_TIME,
DateTimes.nowUtc().toString());
+ context.put(USER_KEY,
plannerContext.getAuthenticationResult().getIdentity());
- return nativeQueryContextOverrides;
+ return context;
}
- public static QueryDefMSQSpec makeQueryDefMSQSpec(
- @Nullable final IngestDestination targetDataSource,
- final QueryContext queryContext,
+ /**
+ * Creates a {@link QueryDefMSQSpec} based on already-planned {@link
QueryDefinition}. Common context
+ * from {@link MultiStageQueryContext#withCommonContext(QueryContext)} is
added to the {@link QueryDefinition}.
+ *
+ * Currently only supports SELECT queries, no DML (INSERT/REPLACE).
+ */
+ public static QueryDefMSQSpec buildQueryDefMSQSpec(
final ColumnMappings columnMappings,
final PlannerContext plannerContext,
final MSQTerminalStageSpecFactory terminalStageSpecFactory,
@@ -306,21 +302,20 @@ public class MSQTaskQueryMaker implements QueryMaker
)
{
final MSQDestination destination = buildMSQDestination(
- targetDataSource,
+ null, // targetDataSource
columnMappings,
plannerContext,
terminalStageSpecFactory
);
- final QueryDefMSQSpec querySpec = new QueryDefMSQSpec.Builder()
+ final QueryContext finalContext =
MultiStageQueryContext.withCommonContext(plannerContext.queryContext());
+ return new QueryDefMSQSpec.Builder()
.columnMappings(columnMappings)
.destination(destination)
-
.assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(plannerContext.queryContext()))
+
.assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(finalContext))
.tuningConfig(makeMSQTuningConfig(plannerContext))
- .queryDef(queryDef.withOverriddenContext(buildOverrideContext(null,
plannerContext, destination)))
+ .queryDef(queryDef.withOverriddenContext(finalContext.asMap()))
.build();
-
- return querySpec;
}
/**
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index 9e96eb7dd7d..2c05fdd606c 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -186,10 +186,8 @@ public class MSQTaskSqlEngine implements SqlEngine
null,
overlordClient,
plannerContext,
- jsonMapper,
relRoot.fields,
- terminalStageSpecFactory,
- queryKitSpecFactory
+ terminalStageSpecFactory
);
}
@@ -221,10 +219,8 @@ public class MSQTaskSqlEngine implements SqlEngine
destination,
overlordClient,
plannerContext,
- jsonMapper,
relRoot.fields,
- terminalStageSpecFactory,
- queryKitSpecFactory
+ terminalStageSpecFactory
);
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index df8fd3da310..79b1c5ca341 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -48,7 +48,6 @@ import org.apache.druid.msq.querykit.ReadableInputQueue;
import org.apache.druid.msq.rpc.ControllerResource;
import org.apache.druid.msq.rpc.SketchEncoding;
import org.apache.druid.msq.sql.MSQMode;
-import org.apache.druid.msq.sql.MSQTaskQueryMaker;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.IndexSpec;
@@ -58,8 +57,10 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -200,6 +201,11 @@ public class MultiStageQueryContext
public static final String CTX_START_TIME = "startTime";
+ /**
+ * The time that a query should time out. Value is an ISO8601 timestamp.
+ */
+ public static final String CTX_QUERY_DEADLINE = "queryDeadline";
+
/**
* Controls sort order within segments. Normally, this is the same as the
overall order of the query (from the
* CLUSTERED BY clause) but it can be overridden.
@@ -239,7 +245,7 @@ public class MultiStageQueryContext
/**
* The {@link FrameType} to use for row-based frames. This context parameter
exists to support rolling updates from
* older Druid versions. The latest type is given by {@link
FrameType#latestRowBased()}, which is set in
- * {@link MSQTaskQueryMaker#buildOverrideContext} starting in Druid 34. Once
all servers are on Druid 34 or newer,
+ * {@link MultiStageQueryContext#withCommonContext} starting in Druid 34.
Once all servers are on Druid 34 or newer,
* the current-latest type {@link FrameType#ROW_BASED_V2} is used.
*/
public static final String CTX_ROW_BASED_FRAME_TYPE = "rowBasedFrameType";
@@ -590,14 +596,64 @@ public class MultiStageQueryContext
public static DateTime getStartTime(final QueryContext queryContext)
{
// Get the start time from the query context set by the broker.
- if (!queryContext.containsKey(CTX_START_TIME)) {
- // If it is missing, as could be the case for an older version of the
broker, use the current time instead, to
- // have something to timeout against.
- DateTime startTime = DateTimes.nowUtc();
- log.warn("Query context does not contain start time. Defaulting to the
current time[%s] instead.", startTime);
- return startTime;
+ final String startTime = queryContext.getString(CTX_START_TIME);
+ if (startTime != null) {
+ return DateTimes.of(startTime);
+ } else {
+ // If it is missing, as could be the case for an older version of the
broker, use the current time instead.
+ final DateTime now = DateTimes.nowUtc();
+ log.warn("Query context does not contain start time. Defaulting to the
current time[%s] instead.", now);
+ return now;
+ }
+ }
+
+ @Nullable
+ public static DateTime getQueryDeadline(final QueryContext queryContext)
+ {
+ final String queryDeadline = queryContext.getString(CTX_QUERY_DEADLINE);
+ if (queryDeadline != null) {
+ return DateTimes.of(queryDeadline);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Returns a final query context including common context keys shared across
all MSQ engines (task, Dart, etc.).
+ */
+ public static QueryContext withCommonContext(final QueryContext
originalContext)
+ {
+ final Map<String, Object> overrides = new HashMap<>();
+
+ // Add appropriate finalization to native query context.
+ if (!originalContext.containsKey(QueryContexts.FINALIZE_KEY)) {
+ overrides.put(QueryContexts.FINALIZE_KEY,
isFinalizeAggregations(originalContext));
}
- return DateTimes.of(queryContext.getString(CTX_START_TIME));
+
+ // This flag is to ensure backward compatibility, as brokers are upgraded
after indexers/middlemanagers.
+ if (!originalContext.containsKey(WINDOW_FUNCTION_OPERATOR_TRANSFORMATION))
{
+ overrides.put(WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true);
+ }
+
+ if (!originalContext.containsKey(CTX_ROW_BASED_FRAME_TYPE)) {
+ // Use the latest row-based frame type. The default is an older type, to
ensure compatibility during rolling
+ // updates. Since the Broker is updated last, it's safe to set this
property on the Broker.
+ overrides.put(CTX_ROW_BASED_FRAME_TYPE, (int)
FrameType.latestRowBased().version());
+ }
+
+ // Add start time.
+ final DateTime now = DateTimes.nowUtc();
+ overrides.put(CTX_START_TIME, now.toString());
+
+ // Add query deadline if not already present (and if timeout is set).
+ if (!originalContext.containsKey(CTX_QUERY_DEADLINE)) {
+ final long timeout =
originalContext.getTimeout(QueryContexts.NO_TIMEOUT);
+ if (timeout != QueryContexts.NO_TIMEOUT) {
+ overrides.put(CTX_QUERY_DEADLINE, now.plus(timeout).toString());
+ }
+ }
+
+ return originalContext.override(overrides);
}
public static Set<String> getColumnsExcludedFromTypeVerification(final
QueryContext queryContext)
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
index c604590851e..41956d55695 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
@@ -56,7 +56,6 @@ import
org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestOverlordServiceClient;
import org.apache.druid.msq.test.MSQTestTaskActionClient;
-import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.InlineDataSource;
@@ -680,10 +679,8 @@ public class MSQTaskQueryMakerTest
ingestDestination,
fakeOverlordClient,
plannerContextMock,
- objectMapper,
fieldMapping,
- terminalStageSpecFactory,
- new MSQTaskQueryKitSpecFactory(new DruidProcessingConfig())
+ terminalStageSpecFactory
);
}
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 5e50f1dcab6..5a2ecbcf243 100644
--- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -363,7 +363,10 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected final WorkerMemoryParameters workerMemoryParameters =
Mockito.spy(makeTestWorkerMemoryParameters());
protected static final String TEST_CONTROLLER_TASK_ID = "query-test-query";
// Fields in the query context to ignore during assertion.
- protected Set<String> ignoreFields =
Set.of(MultiStageQueryContext.CTX_START_TIME);
+ protected Set<String> ignoreFields = Set.of(
+ MultiStageQueryContext.CTX_START_TIME,
+ MultiStageQueryContext.CTX_QUERY_DEADLINE
+ );
protected static class MSQBaseComponentSupplier extends
StandardComponentSupplier
{
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index 953d7200500..6db3597d632 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
@@ -36,6 +37,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.StringEncodingStrategy;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
+import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
@@ -468,6 +470,43 @@ public class MultiStageQueryContextTest
Assert.assertEquals(Integer.valueOf(4),
MultiStageQueryContext.getMaxThreads(QueryContext.of(propertyMap)));
}
+ @Test
+ public void withCommonContext_noTimeout_setsStartTimeOnly()
+ {
+ final QueryContext context =
MultiStageQueryContext.withCommonContext(QueryContext.empty());
+
Assert.assertTrue(context.containsKey(MultiStageQueryContext.CTX_START_TIME));
+
Assert.assertFalse(context.containsKey(MultiStageQueryContext.CTX_QUERY_DEADLINE));
+ Assert.assertEquals(true, context.get(QueryContexts.FINALIZE_KEY));
+ Assert.assertEquals(true,
context.get(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION));
+
Assert.assertTrue(context.containsKey(MultiStageQueryContext.CTX_ROW_BASED_FRAME_TYPE));
+ }
+
+ @Test
+ public void withCommonContext_withTimeout_setsDeadline()
+ {
+ final long timeoutMs = 60_000;
+ final QueryContext context = MultiStageQueryContext.withCommonContext(
+ QueryContext.of(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, timeoutMs))
+ );
+
+
Assert.assertTrue(context.containsKey(MultiStageQueryContext.CTX_START_TIME));
+
Assert.assertTrue(context.containsKey(MultiStageQueryContext.CTX_QUERY_DEADLINE));
+
+ final DateTime startTime = DateTimes.of((String)
context.get(MultiStageQueryContext.CTX_START_TIME));
+ final DateTime deadline = DateTimes.of((String)
context.get(MultiStageQueryContext.CTX_QUERY_DEADLINE));
+ Assert.assertEquals(timeoutMs, deadline.getMillis() -
startTime.getMillis());
+ }
+
+ @Test
+ public void withCommonContext_mergesUserContext()
+ {
+ final QueryContext context = MultiStageQueryContext.withCommonContext(
+ QueryContext.of(ImmutableMap.of("customKey", "customValue"))
+ );
+
+ Assert.assertEquals("customValue", context.get("customKey"));
+ }
+
private static List<String> decodeSortOrder(@Nullable final String input)
{
return
MultiStageQueryContext.decodeList(MultiStageQueryContext.CTX_SORT_ORDER, input);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]