This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5608a8a20ec refactor: Remove parallelism configs from QueryKitSpec.
(#19280)
5608a8a20ec is described below
commit 5608a8a20ec72b2e2a786ccbd5e08338e98add85
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Apr 14 20:58:19 2026 -0700
refactor: Remove parallelism configs from QueryKitSpec. (#19280)
This patch reduces QueryKitSpec to simply a QueryKit plus a
desired query ID. All parallelism configs are removed. This is
useful because QueryKitSpec is not used at all in pre-planned
decoupled planning. Therefore, this change helps better align
plans created in decoupled mode from plans created with QueryKit.
The parameters have been migrated to other locations:
- maxLeafWorkerCount is moved to WorkerManager#maxWorkerCount.
- maxNonLeafWorkerCount is moved to ControllerContext.
- targetPartitionsPerWorker is moved to ControllerContext.
Planners can signify that they want their partition counts set to
"maxWorkerCount * targetPartitionsPerWorker" by using the new
"adjustable" parameter on various ShuffleSpecs.
---
.../msq/dart/controller/DartControllerContext.java | 18 +++++
.../msq/dart/controller/DartWorkerManager.java | 6 ++
.../controller/sql/PrePlannedDartQueryMaker.java | 4 +-
.../apache/druid/msq/exec/ControllerContext.java | 12 +++
.../org/apache/druid/msq/exec/ControllerImpl.java | 14 +++-
.../msq/exec/ControllerQueryResultsReader.java | 3 -
.../org/apache/druid/msq/exec/WorkerManager.java | 6 ++
.../msq/indexing/IndexerControllerContext.java | 18 +++++
.../druid/msq/indexing/MSQWorkerTaskLauncher.java | 11 +++
.../destination/DataSourceMSQDestination.java | 2 +-
.../destination/DurableStorageMSQDestination.java | 2 +-
.../indexing/destination/ExportMSQDestination.java | 2 +-
.../msq/kernel/GlobalSortMaxCountShuffleSpec.java | 43 ++++++++++-
.../apache/druid/msq/kernel/HashShuffleSpec.java | 58 ++++++++++++++-
.../apache/druid/msq/kernel/QueryDefinition.java | 34 +++++++++
.../org/apache/druid/msq/kernel/ShuffleSpec.java | 19 +++++
.../apache/druid/msq/kernel/StageDefinition.java | 43 +++++++++++
.../druid/msq/kernel/controller/WorkerInputs.java | 86 ++++++++++++----------
.../DruidLogicalToQueryDefinitionTranslator.java | 8 +-
.../org/apache/druid/msq/logical/StageMaker.java | 11 +++
.../druid/msq/logical/stages/GroupByStages.java | 2 +-
.../apache/druid/msq/logical/stages/JoinStage.java | 2 +-
.../druid/msq/logical/stages/LogicalStage.java | 8 ++
.../druid/msq/logical/stages/OffsetLimitStage.java | 6 ++
.../apache/druid/msq/logical/stages/SortStage.java | 33 ++++++++-
.../apache/druid/msq/querykit/DataSourcePlan.java | 20 ++---
.../apache/druid/msq/querykit/QueryKitSpec.java | 50 ++-----------
.../druid/msq/querykit/ShuffleSpecFactories.java | 10 ++-
.../druid/msq/querykit/WindowOperatorQueryKit.java | 15 ++--
.../msq/querykit/groupby/GroupByQueryKit.java | 7 +-
.../druid/msq/querykit/scan/ScanQueryKit.java | 2 +-
.../druid/msq/sql/DartQueryKitSpecFactory.java | 44 +----------
.../druid/msq/sql/MSQTaskQueryKitSpecFactory.java | 29 +-------
.../dart/controller/http/DartSqlResourceTest.java | 3 +-
.../org/apache/druid/msq/exec/MSQTasksTest.java | 1 +
.../indexing/MSQWorkerTaskLauncherRetryTest.java | 5 ++
.../msq/indexing/MSQWorkerTaskLauncherTest.java | 2 +
.../controller/MockQueryDefinitionBuilder.java | 3 +-
.../druid/msq/test/MSQTestControllerContext.java | 13 ++++
39 files changed, 454 insertions(+), 201 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index b7e1b3c94bc..a4a4ff8945a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -236,4 +236,22 @@ public class DartControllerContext implements
ControllerContext
{
throw DruidException.defensive("TaskLockType is not used with class[%s]",
getClass().getName());
}
+
+ @Override
+ public int maxNonLeafWorkerCount()
+ {
+ return context.getInt(
+ DartControllerContext.CTX_MAX_NON_LEAF_WORKER_COUNT,
+ DartControllerContext.DEFAULT_MAX_NON_LEAF_WORKER_COUNT
+ );
+ }
+
+ @Override
+ public int targetPartitionsPerWorker()
+ {
+ return MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
+ context,
+ DEFAULT_TARGET_PARTITIONS_PER_WORKER
+ );
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
index 05a26c39344..f6d68d57f42 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
@@ -155,6 +155,12 @@ public class DartWorkerManager implements WorkerManager
return workerIdToNumber.containsKey(workerId);
}
+ @Override
+ public int getMaxWorkerCount()
+ {
+ return workerIds.size();
+ }
+
@Override
public Map<Integer, List<WorkerStats>> getWorkerStats()
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
index 5740142dd63..7ec616d212a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
@@ -51,8 +51,8 @@ import java.util.Map.Entry;
*/
class PrePlannedDartQueryMaker implements QueryMaker,
QueryMaker.FromDruidLogical
{
- private PlannerContext plannerContext;
- private DartQueryMaker dartQueryMaker;
+ private final PlannerContext plannerContext;
+ private final DartQueryMaker dartQueryMaker;
public PrePlannedDartQueryMaker(PlannerContext plannerContext,
DartQueryMaker queryMaker)
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index 008aae55818..2d14c10788b 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -29,6 +29,7 @@ import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.server.DruidNode;
@@ -123,4 +124,15 @@ public interface ControllerContext
* Client for communicating with workers.
*/
WorkerClient newWorkerClient();
+
+ /**
+ * Maximum number of workers for non-leaf stages.
+ */
+ int maxNonLeafWorkerCount();
+
+ /**
+ * Target number of partitions per worker for shuffle stages. Used at
runtime to adjust
+ * shuffle specs that have {@link ShuffleSpec#isAdjustable()} set to true.
+ */
+ int targetPartitionsPerWorker();
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index e02d2909547..f616db7a032 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -728,7 +728,7 @@ public class ControllerImpl implements Controller
final QueryContext queryContext = querySpec.getContext();
- final QueryDefinition queryDef;
+ QueryDefinition queryDef;
if (legacyQuery != null) {
QueryKitBasedMSQPlanner qkPlanner = new QueryKitBasedMSQPlanner(
querySpec,
@@ -762,9 +762,6 @@ public class ControllerImpl implements Controller
}
}
- QueryValidator.validateQueryDef(queryDef);
- queryDefRef.set(queryDef);
-
workerManager = context.newWorkerManager(
context.queryId(),
querySpec,
@@ -772,6 +769,15 @@ public class ControllerImpl implements Controller
getWorkerFailureListener()
);
+ queryDef = queryDef.withRuntimeBounds(
+ workerManager.getMaxWorkerCount(),
+ context.maxNonLeafWorkerCount(),
+ context.targetPartitionsPerWorker()
+ );
+
+ QueryValidator.validateQueryDef(queryDef);
+ queryDefRef.set(queryDef);
+
if (queryKernelConfig.isFaultTolerant() && !(workerManager instanceof
RetryCapableWorkerManager)) {
// Not expected to happen, since all WorkerManager impls are currently
retry-capable. Defensive check
// for future-proofing.
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
index e8766f049fd..51ddb03b2e4 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
@@ -26,7 +26,6 @@ import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.FrameProcessors;
import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
-import org.apache.druid.java.util.common.logger.Logger;
import java.io.IOException;
import java.util.Collections;
@@ -37,8 +36,6 @@ import java.util.List;
*/
public class ControllerQueryResultsReader implements FrameProcessor<Void>
{
- private static final Logger log = new
Logger(ControllerQueryResultsReader.class);
-
private final ReadableFrameChannel in;
private final FrameReader frameReader;
private final QueryListener queryListener;
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
index ff76867dd30..01bcb81a756 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
@@ -89,6 +89,12 @@ public interface WorkerManager
*/
Map<Integer, List<WorkerStats>> getWorkerStats();
+ /**
+ * Maximum number of workers that can be used by this manager. Used at
runtime to cap
+ * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()}.
+ */
+ int getMaxWorkerCount();
+
/**
* Stop all workers.
*
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 3edd414b515..cd91376e5c9 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -231,6 +231,7 @@ public class IndexerControllerContext implements
ControllerContext
makeTaskContext(querySpec, queryKernelConfig, taskContext),
// 10 minutes +- 2 minutes jitter
TimeUnit.SECONDS.toMillis(600 +
ThreadLocalRandom.current().nextInt(-4, 5) * 30L),
+ task.getQuerySpec().getTuningConfig().getMaxNumWorkers(),
new MSQWorkerTaskLauncherConfig()
);
}
@@ -241,6 +242,23 @@ public class IndexerControllerContext implements
ControllerContext
return toolbox.getIndexingTmpDir();
}
+ @Override
+ public int maxNonLeafWorkerCount()
+ {
+ return task.getQuerySpec().getTuningConfig().getMaxNumWorkers();
+ }
+
+ @Override
+ public int targetPartitionsPerWorker()
+ {
+ // Assume tasks are symmetric: workers have the same number of processors
available as a controller.
+ // Create one partition per processor per worker, for maximum parallelism.
+ return MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
+ taskQuerySpecContext,
+ memoryIntrospector.numProcessingThreads()
+ );
+ }
+
/**
* Helper method for {@link #queryKernelConfig(MSQSpec)}. Also used in tests.
*/
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index d773f225fac..7ef30346d7a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -100,6 +100,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
private final String controllerTaskId;
private final String dataSource;
private final OverlordClient overlordClient;
+ private final int maxWorkerCount;
private final ExecutorService exec;
private final long maxTaskStartDelayMillis;
private final MSQWorkerTaskLauncherConfig config;
@@ -165,6 +166,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
final WorkerFailureListener workerFailureListener,
final Map<String, Object> taskContextOverrides,
final long maxTaskStartDelayMillis,
+ final int maxWorkerCount,
final MSQWorkerTaskLauncherConfig config
)
{
@@ -175,6 +177,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
workerFailureListener,
taskContextOverrides,
maxTaskStartDelayMillis,
+ maxWorkerCount,
config,
TimeUnit.SECONDS.toMillis(60)
);
@@ -188,6 +191,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
final WorkerFailureListener workerFailureListener,
final Map<String, Object> taskContextOverrides,
final long maxTaskStartDelayMillis,
+ final int maxWorkerCount,
final MSQWorkerTaskLauncherConfig config,
final long taskIdsLockTimeout
)
@@ -195,6 +199,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
this.controllerTaskId = controllerTaskId;
this.dataSource = dataSource;
this.overlordClient = overlordClient;
+ this.maxWorkerCount = maxWorkerCount;
this.workerFailureListener = workerFailureListener;
this.taskContextOverrides = taskContextOverrides;
this.exec = Execs.singleThreaded(
@@ -553,6 +558,12 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
}
}
+ @Override
+ public int getMaxWorkerCount()
+ {
+ return maxWorkerCount;
+ }
+
/**
* Returns a pair which contains the number of currently running worker
tasks and the number of worker tasks that are
* not yet fully started as left and right respectively.
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
index 02132ac2312..63f163f82ca 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
@@ -209,7 +209,7 @@ public class DataSourceMSQDestination implements
MSQDestination
@Override
public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
{
- return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
+ return ShuffleSpecFactories.globalSortWithTargetSize(targetSize);
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java
index 88fe5f58e5a..eb124eda213 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java
@@ -56,7 +56,7 @@ public class DurableStorageMSQDestination implements
MSQDestination
@Override
public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
{
- return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
+ return ShuffleSpecFactories.globalSortWithTargetSize(targetSize);
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java
index d6a78def63a..d03c2238fdb 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java
@@ -108,7 +108,7 @@ public class ExportMSQDestination implements MSQDestination
@Override
public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
{
- return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
+ return ShuffleSpecFactories.globalSortWithTargetSize(targetSize);
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
index 35576474ff9..bcb2718c402 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
@@ -44,24 +44,31 @@ public class GlobalSortMaxCountShuffleSpec implements
GlobalSortShuffleSpec
private final int maxPartitions;
private final boolean aggregate;
private final long limitHint;
+ private final boolean adjustable;
@JsonCreator
public GlobalSortMaxCountShuffleSpec(
@JsonProperty("clusterBy") final ClusterBy clusterBy,
@JsonProperty("partitions") final int maxPartitions,
@JsonProperty("aggregate") final boolean aggregate,
- @JsonProperty("limitHint") final Long limitHint
+ @JsonProperty("limitHint") final Long limitHint,
+ @JsonProperty("adjustable") final boolean adjustable
)
{
this.clusterBy = Preconditions.checkNotNull(clusterBy, "clusterBy");
this.maxPartitions = maxPartitions;
this.aggregate = aggregate;
this.limitHint = limitHint == null ? UNLIMITED : limitHint;
+ this.adjustable = adjustable;
if (maxPartitions < 1) {
throw new IAE("Partition count must be at least 1");
}
+ if (adjustable && maxPartitions != 1) {
+ throw new IAE("Partition count must be 1 when adjustable is true, but
was [%d]", maxPartitions);
+ }
+
if (!clusterBy.sortable()) {
throw new IAE("ClusterBy key must be sortable");
}
@@ -72,6 +79,16 @@ public class GlobalSortMaxCountShuffleSpec implements
GlobalSortShuffleSpec
}
}
+ public GlobalSortMaxCountShuffleSpec(
+ final ClusterBy clusterBy,
+ final int maxPartitions,
+ final boolean aggregate,
+ final Long limitHint
+ )
+ {
+ this(clusterBy, maxPartitions, aggregate, limitHint, false);
+ }
+
@Override
public ShuffleKind kind()
{
@@ -144,6 +161,26 @@ public class GlobalSortMaxCountShuffleSpec implements
GlobalSortShuffleSpec
return limitHint;
}
+ @Override
+ @JsonProperty("adjustable")
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public boolean isAdjustable()
+ {
+ return adjustable;
+ }
+
+ @Override
+ public ShuffleSpec withPartitionCount(final int partitionCount)
+ {
+ return new GlobalSortMaxCountShuffleSpec(
+ clusterBy,
+ partitionCount,
+ aggregate,
+ limitHint == UNLIMITED ? null : limitHint,
+ false
+ );
+ }
+
@Override
public boolean equals(Object o)
{
@@ -156,6 +193,7 @@ public class GlobalSortMaxCountShuffleSpec implements
GlobalSortShuffleSpec
GlobalSortMaxCountShuffleSpec that = (GlobalSortMaxCountShuffleSpec) o;
return maxPartitions == that.maxPartitions
&& aggregate == that.aggregate
+ && adjustable == that.adjustable
&& Objects.equals(clusterBy, that.clusterBy)
&& Objects.equals(limitHint, that.limitHint);
}
@@ -163,7 +201,7 @@ public class GlobalSortMaxCountShuffleSpec implements
GlobalSortShuffleSpec
@Override
public int hashCode()
{
- return Objects.hash(clusterBy, maxPartitions, aggregate, limitHint);
+ return Objects.hash(clusterBy, maxPartitions, aggregate, limitHint,
adjustable);
}
@Override
@@ -174,6 +212,7 @@ public class GlobalSortMaxCountShuffleSpec implements
GlobalSortShuffleSpec
", maxPartitions=" + maxPartitions +
", aggregate=" + aggregate +
", limitHint=" + limitHint +
+ ", adjustable=" + adjustable +
'}';
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java
index 69e66fffe26..23e9e98a217 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java
@@ -20,25 +20,35 @@
package org.apache.druid.msq.kernel;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.java.util.common.IAE;
+import java.util.Objects;
+
public class HashShuffleSpec implements ShuffleSpec
{
public static final String TYPE = "hash";
private final ClusterBy clusterBy;
private final int numPartitions;
+ private final boolean adjustable;
@JsonCreator
public HashShuffleSpec(
@JsonProperty("clusterBy") final ClusterBy clusterBy,
- @JsonProperty("partitions") final int numPartitions
+ @JsonProperty("partitions") final int numPartitions,
+ @JsonProperty("adjustable") final boolean adjustable
)
{
this.clusterBy = clusterBy;
this.numPartitions = numPartitions;
+ this.adjustable = adjustable;
+
+ if (adjustable && numPartitions != 1) {
+ throw new IAE("Partition count must be 1 when adjustable is true, but
was [%d]", numPartitions);
+ }
if (clusterBy.getBucketByCount() > 0) {
// Only GlobalSortTargetSizeShuffleSpec supports bucket-by.
@@ -65,4 +75,50 @@ public class HashShuffleSpec implements ShuffleSpec
{
return numPartitions;
}
+
+ @Override
+ @JsonProperty("adjustable")
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public boolean isAdjustable()
+ {
+ return adjustable;
+ }
+
+ @Override
+ public ShuffleSpec withPartitionCount(final int partitionCount)
+ {
+ return new HashShuffleSpec(
+ clusterBy,
+ partitionCount,
+ false
+ );
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HashShuffleSpec that = (HashShuffleSpec) o;
+ return numPartitions == that.numPartitions
+ && adjustable == that.adjustable
+ && Objects.equals(clusterBy, that.clusterBy);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(clusterBy, numPartitions, adjustable);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "HashShuffleSpec{" +
+ "clusterBy=" + clusterBy +
+ ", numPartitions=" + numPartitions +
+ ", adjustable=" + adjustable +
+ '}';
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java
index e171118be2c..f315d029f7d 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java
@@ -38,6 +38,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -207,4 +208,37 @@ public class QueryDefinition
{
return new QueryDefinition(stageDefinitions, finalStage,
context.override(contextOverride));
}
+
+ /**
+ * Returns a new {@link QueryDefinition} with runtime bounds applied:
+ * <ul>
+ * <li>All stages {@link StageDefinition#getMaxWorkerCount()} are capped
to {@code maxWorkerCount}.</li>
+ * <li>All nonleaf stages {@link StageDefinition#getMaxWorkerCount()} are
further capped to
+ * {@code maxNonLeafWorkerCount}.</li>
+ * <li>All stage shuffle specs, if {@link ShuffleSpec#isAdjustable()},
have their partition count set to
+ * the capped max worker count times {@code
targetPartitionsPerWorker}.</li>
+ * </ul>
+ */
+ public QueryDefinition withRuntimeBounds(
+ final int maxWorkerCount,
+ final int maxNonLeafWorkerCount,
+ final int targetPartitionsPerWorker
+ )
+ {
+ boolean anyChanged = false;
+ final Map<StageId, StageDefinition> newStageDefinitions = new
LinkedHashMap<>();
+ for (Map.Entry<StageId, StageDefinition> entry :
stageDefinitions.entrySet()) {
+ final StageDefinition stageDef = entry.getValue();
+ final StageDefinition adjustedStageDef =
+ stageDef.withRuntimeBounds(maxWorkerCount, maxNonLeafWorkerCount,
targetPartitionsPerWorker);
+ newStageDefinitions.put(entry.getKey(), adjustedStageDef);
+ if (!Objects.equals(adjustedStageDef, stageDef)) {
+ anyChanged = true;
+ }
+ }
+ if (!anyChanged) {
+ return this;
+ }
+ return new QueryDefinition(newStageDefinitions, finalStage, context);
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java
index 97f3e6db547..2d02acb5ecc 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.kernel;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.frame.key.ClusterBy;
/**
@@ -83,4 +84,22 @@ public interface ShuffleSpec
{
return UNLIMITED;
}
+
+ /**
+ * Whether this shuffle spec's partition count should be adjusted at runtime
by multiplying by
+ * {@code targetPartitionsPerWorker} from the controller context.
+ */
+ default boolean isAdjustable()
+ {
+ return false;
+ }
+
+ /**
+ * Returns a copy of this shuffle spec with the partition count set to
{@code partitionCount}
+ * and {@code adjustable} set to false. Only meaningful when {@link
#isAdjustable()} is true.
+ */
+ default ShuffleSpec withPartitionCount(int partitionCount)
+ {
+ throw NotYetImplemented.ex(null, "withPartitionCount not implemented for
[%s]", getClass().getSimpleName());
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index f836a357d3c..7fc3db19f1a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -175,6 +175,49 @@ public class StageDefinition
.shuffleCheckHasMultipleValues(stageDef.getShuffleCheckHasMultipleValues());
}
+ /**
+ * Returns a new {@link StageDefinition} with runtime bounds applied. See
{@link QueryDefinition#withRuntimeBounds}
+ * for details on the logic.
+ */
+ public StageDefinition withRuntimeBounds(
+ final int maxWorkerCount,
+ final int maxNonLeafWorkerCount,
+ final int targetPartitionsPerWorker
+ )
+ {
+ final int adjustedMaxWorkerCount;
+ final ShuffleSpec adjustedShuffleSpec;
+
+ if (InputSpecs.hasLeafInputs(inputSpecs, getBroadcastInputNumbers())) {
+ // Leaf stage.
+ adjustedMaxWorkerCount = Math.min(this.maxWorkerCount, maxWorkerCount);
+ } else {
+ // Nonleaf stage.
+ adjustedMaxWorkerCount = Math.min(this.maxWorkerCount,
Math.min(maxWorkerCount, maxNonLeafWorkerCount));
+ }
+
+ if (shuffleSpec != null && shuffleSpec.isAdjustable()) {
+ adjustedShuffleSpec =
shuffleSpec.withPartitionCount(adjustedMaxWorkerCount *
targetPartitionsPerWorker);
+ } else {
+ adjustedShuffleSpec = shuffleSpec;
+ }
+
+ if (adjustedMaxWorkerCount == this.maxWorkerCount &&
Objects.equals(adjustedShuffleSpec, shuffleSpec)) {
+ return this;
+ }
+
+ return new StageDefinition(
+ id,
+ inputSpecs,
+ broadcastInputNumbers,
+ processor,
+ signature,
+ adjustedShuffleSpec,
+ adjustedMaxWorkerCount,
+ shuffleCheckHasMultipleValues
+ );
+ }
+
/**
* Returns a unique stage identifier.
*/
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
index ef825ed69d3..92531eb1c00 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
@@ -77,48 +77,60 @@ public class WorkerInputs
return new WorkerInputs(assignmentsMap);
}
- // Assign input slices to workers.
+ // Assign non-broadcast input slices to workers first, so we know the
actual worker count.
+ // Then assign broadcast inputs to all assigned workers.
for (int inputNumber = 0; inputNumber < numInputs; inputNumber++) {
+ if (stageDef.getBroadcastInputNumbers().contains(inputNumber)) {
+ continue;
+ }
+
final InputSpec inputSpec = stageDef.getInputSpecs().get(inputNumber);
final SegmentPruner pruner = stageDef.getSegmentPruner(inputNumber);
- if (stageDef.getBroadcastInputNumbers().contains(inputNumber)) {
- // Broadcast case: send everything everywhere.
- final List<InputSlice> broadcastSlices = slicer.sliceStatic(inputSpec,
pruner, 1);
- final InputSlice broadcastSlice = broadcastSlices.isEmpty()
- ? NilInputSlice.INSTANCE
- :
Iterables.getOnlyElement(broadcastSlices);
-
- for (int workerNumber = 0; workerNumber <
stageDef.getMaxWorkerCount(); workerNumber++) {
- assignmentsMap.computeIfAbsent(
- workerNumber,
- ignored -> Arrays.asList(new InputSlice[numInputs])
- ).set(inputNumber, broadcastSlice);
- }
- } else {
- // Non-broadcast case: split slices across workers.
- List<InputSlice> slices = assignmentStrategy.assign(
- stageDef,
- inputSpec,
- stageWorkerCountMap,
- slicer,
- pruner,
- maxInputFilesPerWorker,
- maxInputBytesPerWorker
- );
-
- if (slices.isEmpty()) {
- // Need at least one slice, so we can have at least one worker. It's
OK if it has nothing to read.
- slices = Collections.singletonList(NilInputSlice.INSTANCE);
- }
+ List<InputSlice> slices = assignmentStrategy.assign(
+ stageDef,
+ inputSpec,
+ stageWorkerCountMap,
+ slicer,
+ pruner,
+ maxInputFilesPerWorker,
+ maxInputBytesPerWorker
+ );
+
+ if (slices.isEmpty()) {
+ // Need at least one slice, so we can have at least one worker. It's
OK if it has nothing to read.
+ slices = Collections.singletonList(NilInputSlice.INSTANCE);
+ }
- // Flip the slices, so it's worker number -> slices for that worker.
- for (int workerNumber = 0; workerNumber < slices.size();
workerNumber++) {
- assignmentsMap.computeIfAbsent(
- workerNumber,
- ignored -> Arrays.asList(new InputSlice[numInputs])
- ).set(inputNumber, slices.get(workerNumber));
- }
+ // Flip the slices, so it's worker number -> slices for that worker.
+ for (int workerNumber = 0; workerNumber < slices.size(); workerNumber++)
{
+ assignmentsMap.computeIfAbsent(
+ workerNumber,
+ ignored -> Arrays.asList(new InputSlice[numInputs])
+ ).set(inputNumber, slices.get(workerNumber));
+ }
+ }
+
+ if (assignmentsMap.isEmpty()) {
+ // All inputs are broadcast. Use a single worker.
+ assignmentsMap.put(0, Arrays.asList(new InputSlice[numInputs]));
+ }
+
+ // Assign broadcast inputs to all workers determined by non-broadcast
assignment above.
+ for (int inputNumber = 0; inputNumber < numInputs; inputNumber++) {
+ if (!stageDef.getBroadcastInputNumbers().contains(inputNumber)) {
+ continue;
+ }
+
+ final InputSpec inputSpec = stageDef.getInputSpecs().get(inputNumber);
+ final SegmentPruner pruner = stageDef.getSegmentPruner(inputNumber);
+ final List<InputSlice> broadcastSlices = slicer.sliceStatic(inputSpec,
pruner, 1);
+ final InputSlice broadcastSlice = broadcastSlices.isEmpty()
+ ? NilInputSlice.INSTANCE
+ :
Iterables.getOnlyElement(broadcastSlices);
+
+ for (int workerNumber : assignmentsMap.keySet()) {
+ assignmentsMap.get(workerNumber).set(inputNumber, broadcastSlice);
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
index 138bc1f2995..c46fa410cf6 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
@@ -122,12 +122,14 @@ public class DruidLogicalToQueryDefinitionTranslator
DruidSort sort = (DruidSort) stack.getNode();
List<OrderByColumnSpec> orderBySpecs =
DruidQuery.buildOrderByColumnSpecs(inputStage.getLogicalRowSignature(), sort);
List<KeyColumn> keyColumns = Lists.transform(orderBySpecs,
KeyColumn::fromOrderByColumnSpec);
- SortStage sortStage = new SortStage(inputStage, keyColumns);
if (sort.hasLimitOrOffset()) {
- return new OffsetLimitStage(sortStage, sort.getOffsetLimit());
+ return new OffsetLimitStage(
+ new SortStage(inputStage, keyColumns, sort.getOffsetLimit()),
+ sort.getOffsetLimit()
+ );
} else {
- return sortStage;
+ return new SortStage(inputStage, keyColumns, null);
}
}
if (stack.getNode() instanceof DruidUnnest) {
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
index 4885a7d3e7d..3b3222f8890 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
@@ -20,6 +20,7 @@
package org.apache.druid.msq.logical;
import org.apache.druid.error.DruidException;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.StageProcessor;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.kernel.MixShuffleSpec;
@@ -100,6 +101,7 @@ public class StageMaker
sdb.signature(frameProcessorStage.getLogicalRowSignature());
sdb.processor(stageProcessor);
sdb.shuffleSpec(MixShuffleSpec.instance());
+ sdb.maxWorkerCount(maxWorkerCountFor(frameProcessorStage));
return sdb;
}
@@ -110,6 +112,7 @@ public class StageMaker
sdb.signature(stage.getRowSignature());
sdb.processor(makeScanStageProcessor(VirtualColumns.EMPTY,
stage.getRowSignature(), null));
sdb.shuffleSpec(stage.buildShuffleSpec());
+ sdb.maxWorkerCount(maxWorkerCountFor(stage));
return sdb;
}
@@ -163,4 +166,12 @@ public class StageMaker
{
return ScanQueryStageProcessor.makeSegmentMapFnProcessor(signature,
dataSource);
}
+
+ /**
+ * Returns the {@code maxWorkerCount} to use for a given {@link
LogicalStage}.
+ */
+ private static int maxWorkerCountFor(LogicalStage stage)
+ {
+ return stage.isSingleWorker() ? 1 : Limits.MAX_WORKERS;
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
index e955c6a4052..809a33cfc2b 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
@@ -91,7 +91,7 @@ public class GroupByStages
{
GroupByQuery gby = makeGbyQuery(projectStage, grouping);
PreShuffleStage aggStage = new PreShuffleStage(projectStage,
gby.withPostAggregatorSpecs(Collections.emptyList()));
- SortStage sortStage = new SortStage(aggStage,
getKeyColumns(grouping.getDimensions()));
+ SortStage sortStage = new SortStage(aggStage,
getKeyColumns(grouping.getDimensions()), null);
PostShuffleStage finalAggStage = new PostShuffleStage(sortStage, gby,
grouping.getOutputRowSignature());
return finalAggStage;
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
index 96cf3d3510b..77cf2f2f740 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
@@ -76,7 +76,7 @@ public class JoinStage
public ShuffleSpec buildShuffleSpec()
{
final ClusterBy clusterBy = new ClusterBy(keyColumns, 0);
- return new HashShuffleSpec(clusterBy, 1);
+ return new HashShuffleSpec(clusterBy, 1, true);
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
index 59f88264dbd..992a3f439b7 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
@@ -59,4 +59,12 @@ public interface LogicalStage
* Returns the inputs of this stage.
*/
List<LogicalInputSpec> getInputSpecs();
+
+ /**
+ * Whether this stage must run on a single worker.
+ */
+ default boolean isSingleWorker()
+ {
+ return false;
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java
index 88496394c65..1950faa5511 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java
@@ -53,6 +53,12 @@ public class OffsetLimitStage extends
AbstractFrameProcessorStage
);
}
+ @Override
+ public boolean isSingleWorker()
+ {
+ return true;
+ }
+
@Override
public RowSignature getLogicalRowSignature()
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
index 834ffdcf6f3..d97f7397dee 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
@@ -26,21 +26,31 @@ import org.apache.druid.msq.logical.LogicalInputSpec;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.OffsetLimit;
import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import javax.annotation.Nullable;
import java.util.List;
public class SortStage extends AbstractShuffleStage
{
protected final List<KeyColumn> keyColumns;
- public SortStage(LogicalStage inputStage, List<KeyColumn> keyColumns)
+ /**
+ * Non-null when this sort feeds a downstream {@link OffsetLimitStage}, in
which case the shuffle output
+ * is funneled into a single sorted partition with a {@code limitHint}
derived from the offset and limit.
+ */
+ @Nullable
+ protected final OffsetLimit offsetLimit;
+
+ public SortStage(LogicalStage inputStage, List<KeyColumn> keyColumns,
@Nullable OffsetLimit offsetLimit)
{
super(
QueryKitUtils.sortableSignature(inputStage.getLogicalRowSignature(),
keyColumns),
LogicalInputSpec.of(inputStage)
);
this.keyColumns = keyColumns;
+ this.offsetLimit = offsetLimit;
}
@Override
@@ -53,7 +63,26 @@ public class SortStage extends AbstractShuffleStage
public ShuffleSpec buildShuffleSpec()
{
final ClusterBy clusterBy = new ClusterBy(keyColumns, 0);
- return
ShuffleSpecFactories.globalSortWithMaxPartitionCount(1).build(clusterBy, false);
+ if (offsetLimit != null) {
+ // Funnel everything through a single sorted partition so the downstream
OffsetLimitStage can apply the
+ // offset and limit.
+ return
ShuffleSpecFactories.singlePartitionWithLimit(computeLimitHint(offsetLimit)).build(clusterBy,
false);
+ } else {
+ return
ShuffleSpecFactories.globalSortWithTargetPartitions().build(clusterBy, false);
+ }
+ }
+
+ /**
+ * Computes the {@link ShuffleSpec#limitHint()} that the upstream sort can
use to short-circuit work when the
+ * downstream stage applies an offset and limit.
+ */
+ private static long computeLimitHint(OffsetLimit offsetLimit)
+ {
+ if (offsetLimit.hasLimit() && offsetLimit.getOffset() +
offsetLimit.getLimit() > 0 /* overflow check */) {
+ return offsetLimit.getOffset() + offsetLimit.getLimit();
+ } else {
+ return ShuffleSpec.UNLIMITED;
+ }
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
index bdc4370f9e7..d3d4d86405b 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
@@ -31,8 +31,8 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSpec;
-import org.apache.druid.msq.input.InputSpecs;
import org.apache.druid.msq.input.external.ExternalInputSpec;
import org.apache.druid.msq.input.inline.InlineInputSpec;
import org.apache.druid.msq.input.lookup.LookupInputSpec;
@@ -267,14 +267,13 @@ public class DataSourcePlan
/**
* Figure for {@link StageDefinition#getMaxWorkerCount()} that should be
used when processing.
*/
- public int getMaxWorkerCount(final QueryKitSpec queryKitSpec)
+ public int getMaxWorkerCount()
{
if (isSingleWorker()) {
return 1;
- } else if (InputSpecs.hasLeafInputs(inputSpecs, broadcastInputs)) {
- return queryKitSpec.getMaxLeafWorkerCount();
} else {
- return queryKitSpec.getMaxNonLeafWorkerCount();
+ // Use MAX_WORKERS as a high upper bound; capped at runtime by
QueryDefinition.withRuntimeBounds.
+ return Limits.MAX_WORKERS;
}
}
@@ -431,7 +430,7 @@ public class DataSourcePlan
// Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in
the context. It's only used for the
// outermost query, and setting it for the subquery makes us
erroneously add bucketing where it doesn't belong.
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
-
ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()),
+ ShuffleSpecFactories.globalSortWithTargetPartitions(),
minStageNumber
);
@@ -669,10 +668,10 @@ public class DataSourcePlan
((StageInputSpec)
Iterables.getOnlyElement(leftPlan.getInputSpecs())).getStageNumber()
);
- final int hashPartitionCount = queryKitSpec.getNumPartitionsForShuffle();
final List<KeyColumn> leftPartitionKey = partitionKeys.get(0);
- leftBuilder.shuffleSpec(new HashShuffleSpec(new
ClusterBy(leftPartitionKey, 0), hashPartitionCount));
+ leftBuilder.shuffleSpec(new HashShuffleSpec(new
ClusterBy(leftPartitionKey, 0), 1, true));
leftBuilder.signature(QueryKitUtils.sortableSignature(leftBuilder.getSignature(),
leftPartitionKey));
+ leftBuilder.maxWorkerCount(Limits.MAX_WORKERS);
// Build up the right stage.
final StageDefinitionBuilder rightBuilder =
subQueryDefBuilder.getStageBuilder(
@@ -680,8 +679,9 @@ public class DataSourcePlan
);
final List<KeyColumn> rightPartitionKey = partitionKeys.get(1);
- rightBuilder.shuffleSpec(new HashShuffleSpec(new
ClusterBy(rightPartitionKey, 0), hashPartitionCount));
+ rightBuilder.shuffleSpec(new HashShuffleSpec(new
ClusterBy(rightPartitionKey, 0), 1, true));
rightBuilder.signature(QueryKitUtils.sortableSignature(rightBuilder.getSignature(),
rightPartitionKey));
+ rightBuilder.maxWorkerCount(Limits.MAX_WORKERS);
// Compute join signature.
final RowSignature.Builder joinSignatureBuilder = RowSignature.builder();
@@ -708,7 +708,7 @@ public class DataSourcePlan
Iterables.getOnlyElement(rightPlan.getInputSpecs())
)
)
- .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
+ .maxWorkerCount(Limits.MAX_WORKERS)
.signature(joinSignatureBuilder.build())
.processor(
new SortMergeJoinStageProcessor(
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java
index 2d0361a037f..4454026fca9 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java
@@ -19,45 +19,29 @@
package org.apache.druid.msq.querykit;
-import org.apache.druid.msq.input.InputSpecs;
import org.apache.druid.msq.kernel.QueryDefinition;
-import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.query.Query;
/**
- * Collection of parameters for {@link QueryKit#makeQueryDefinition}.
+ * Container for {@link QueryKit} plus the queryId that we want to build.
*/
public class QueryKitSpec
{
private final QueryKit<Query<?>> queryKit;
private final String queryId;
- private final int maxLeafWorkerCount;
- private final int maxNonLeafWorkerCount;
- private final int targetPartitionsPerWorker;
/**
- * @param queryKit kit that is used to translate native
subqueries; i.e.,
- * {@link
org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}.
- * @param queryId queryId of the resulting {@link
QueryDefinition}
- * @param maxLeafWorkerCount maximum number of workers for leaf
stages: becomes
- * {@link
StageDefinition#getMaxWorkerCount()}
- * @param maxNonLeafWorkerCount maximum number of workers for non-leaf
stages: becomes
- * {@link
StageDefinition#getMaxWorkerCount()}
- * @param targetPartitionsPerWorker preferred number of partitions per
worker for subqueries
+ * @param queryKit kit that is used to translate native
subqueries; i.e.,
+ * {@link
org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}.
+ * @param queryId queryId of the resulting {@link
QueryDefinition}
*/
public QueryKitSpec(
QueryKit<Query<?>> queryKit,
- String queryId,
- int maxLeafWorkerCount,
- int maxNonLeafWorkerCount,
- int targetPartitionsPerWorker
+ String queryId
)
{
this.queryId = queryId;
this.queryKit = queryKit;
- this.maxLeafWorkerCount = maxLeafWorkerCount;
- this.maxNonLeafWorkerCount = maxNonLeafWorkerCount;
- this.targetPartitionsPerWorker = targetPartitionsPerWorker;
}
/**
@@ -75,28 +59,4 @@ public class QueryKitSpec
{
return queryId;
}
-
- /**
- * Maximum number of workers for leaf stages. See {@link
InputSpecs#hasLeafInputs}.
- */
- public int getMaxLeafWorkerCount()
- {
- return maxLeafWorkerCount;
- }
-
- /**
- * Maximum number of workers for non-leaf stages. See {@link
InputSpecs#hasLeafInputs}.
- */
- public int getMaxNonLeafWorkerCount()
- {
- return maxNonLeafWorkerCount;
- }
-
- /**
- * Number of partitions to generate during a shuffle.
- */
- public int getNumPartitionsForShuffle()
- {
- return maxNonLeafWorkerCount * targetPartitionsPerWorker;
- }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java
index 24f6dfffe27..11bfaad390a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java
@@ -24,6 +24,7 @@ import
org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec;
import org.apache.druid.msq.kernel.MixShuffleSpec;
import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.kernel.StageDefinition;
/**
* Static factory methods for common implementations of {@link
ShuffleSpecFactory}.
@@ -61,12 +62,13 @@ public class ShuffleSpecFactories
}
/**
- * Factory that produces a particular number of output partitions.
+ * Factory that produces an adjustable globally-sorted shuffle spec. The
partition count is adjusted at
+ * runtime by {@link StageDefinition#withRuntimeBounds(int, int, int)}.
*/
- public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int
partitions)
+ public static ShuffleSpecFactory globalSortWithTargetPartitions()
{
return (clusterBy, aggregate) ->
- new GlobalSortMaxCountShuffleSpec(clusterBy, partitions, aggregate,
ShuffleSpec.UNLIMITED);
+ new GlobalSortMaxCountShuffleSpec(clusterBy, 1, aggregate,
ShuffleSpec.UNLIMITED, true);
}
/**
@@ -75,7 +77,7 @@ public class ShuffleSpecFactories
*
* Produces {@link MixShuffleSpec}, ignoring the target size, if the
provided {@link ClusterBy} is empty.
*/
- public static ShuffleSpecFactory getGlobalSortWithTargetSize(int targetSize)
+ public static ShuffleSpecFactory globalSortWithTargetSize(int targetSize)
{
return (clusterBy, aggregate) -> {
if (clusterBy.isEmpty()) {
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
index 7a2ec6f239a..eebd97b607b 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
@@ -28,6 +28,7 @@ import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.HashShuffleSpec;
import org.apache.druid.msq.kernel.MixShuffleSpec;
@@ -93,14 +94,13 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
final WindowStages windowStages = new WindowStages(
originalQuery,
jsonMapper,
- queryKitSpec.getNumPartitionsForShuffle(),
- queryKitSpec.getMaxNonLeafWorkerCount(),
+ Limits.MAX_WORKERS,
resultShuffleSpecFactory,
signatureFromInput,
isOperatorTransformationEnabled
);
- final ShuffleSpec nextShuffleSpec =
windowStages.getStages().get(0).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle());
+ final ShuffleSpec nextShuffleSpec =
windowStages.getStages().get(0).findShuffleSpec();
final QueryDefinitionBuilder queryDefBuilder =
makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan,
nextShuffleSpec);
final int firstWindowStageNumber = Math.max(minStageNumber,
queryDefBuilder.getNextStageNumber());
@@ -121,7 +121,6 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
{
private final List<WindowStage> stages;
private final WindowOperatorQuery query;
- private final int numPartitionsForShuffle;
private final int maxNonLeafWorkerCount;
private final ShuffleSpec finalWindowStageShuffleSpec;
private final RowSignature finalWindowStageRowSignature;
@@ -131,7 +130,6 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
private WindowStages(
WindowOperatorQuery query,
ObjectMapper jsonMapper,
- int numPartitionsForShuffle,
int maxNonLeafWorkerCount,
ShuffleSpecFactory resultShuffleSpecFactory,
RowSignature signatureFromInput,
@@ -140,7 +138,6 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
{
this.stages = new ArrayList<>();
this.query = query;
- this.numPartitionsForShuffle = numPartitionsForShuffle;
this.maxNonLeafWorkerCount = maxNonLeafWorkerCount;
this.isOperatorTransformationEnabled = isOperatorTransformationEnabled;
@@ -223,7 +220,7 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
final WindowStage stage = stages.get(windowStageIndex);
final ShuffleSpec shuffleSpec = (windowStageIndex == stages.size() - 1) ?
finalWindowStageShuffleSpec :
- stages.get(windowStageIndex +
1).findShuffleSpec(numPartitionsForShuffle);
+ stages.get(windowStageIndex +
1).findShuffleSpec();
final RowSignature stageRowSignature =
getRowSignatureForStage(windowStageIndex, shuffleSpec);
final List<OperatorFactory> operatorFactories =
isOperatorTransformationEnabled
@@ -362,7 +359,7 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
return windowOperatorFactories;
}
- private ShuffleSpec findShuffleSpec(int partitionCount)
+ private ShuffleSpec findShuffleSpec()
{
Map<String, ColumnWithDirection.Direction> sortColumnsMap = new
HashMap<>();
if (sortOperatorFactory != null) {
@@ -392,7 +389,7 @@ public class WindowOperatorQueryKit implements
QueryKit<WindowOperatorQuery>
keyColsOfWindow.add(kc);
}
- return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0),
partitionCount);
+ return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), 1, true);
}
private boolean canAccept(OperatorFactory operatorFactory)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
index 8ace680938c..32dd480473e 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
@@ -29,6 +29,7 @@ import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
@@ -139,7 +140,7 @@ public class GroupByQueryKit implements
QueryKit<GroupByQuery>
shuffleSpecFactoryPreAggregation =
intermediateClusterBy.isEmpty()
? ShuffleSpecFactories.singlePartition()
- :
ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle());
+ : ShuffleSpecFactories.globalSortWithTargetPartitions();
if (doLimitOrOffset) {
shuffleSpecFactoryPostAggregation =
ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint);
@@ -164,7 +165,7 @@ public class GroupByQueryKit implements
QueryKit<GroupByQuery>
.broadcastInputs(dataSourcePlan.getBroadcastInputs())
.signature(intermediateSignature)
.shuffleSpec(shuffleSpecFactoryPreAggregation.build(intermediateClusterBy,
true))
-
.maxWorkerCount(dataSourcePlan.getMaxWorkerCount(queryKitSpec))
+ .maxWorkerCount(dataSourcePlan.getMaxWorkerCount())
.processor(new
GroupByPreShuffleStageProcessor(queryToRun))
);
@@ -184,7 +185,7 @@ public class GroupByQueryKit implements
QueryKit<GroupByQuery>
StageDefinition.builder(firstStageNumber + 1)
.inputs(new StageInputSpec(firstStageNumber))
.signature(resultSignature)
- .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
+ .maxWorkerCount(Limits.MAX_WORKERS)
.shuffleSpec(
shuffleSpecFactoryPostAggregation != null
?
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
index f6cc9222df9..6f82fd08772 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
@@ -161,7 +161,7 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
.broadcastInputs(dataSourcePlan.getBroadcastInputs())
.shuffleSpec(scanShuffleSpec)
.signature(signatureToUse)
-
.maxWorkerCount(dataSourcePlan.getMaxWorkerCount(queryKitSpec))
+ .maxWorkerCount(dataSourcePlan.getMaxWorkerCount())
.processor(new ScanQueryStageProcessor(queryToRun))
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
index d76264621d5..44ade1f09e0 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
@@ -19,61 +19,23 @@
package org.apache.druid.msq.sql;
-import com.google.inject.Inject;
-import org.apache.druid.client.TimelineServerView;
-import org.apache.druid.msq.dart.controller.DartControllerContext;
import org.apache.druid.msq.exec.QueryKitSpecFactory;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitSpec;
-import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
-import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.coordination.ServerType;
public class DartQueryKitSpecFactory implements QueryKitSpecFactory
{
- private final TimelineServerView serverView;
-
- @Inject
- public DartQueryKitSpecFactory(TimelineServerView serverView)
- {
- this.serverView = serverView;
- }
-
@Override
public QueryKitSpec makeQueryKitSpec(
final QueryKit<Query<?>> queryKit,
final String queryId,
final MSQTuningConfig tuningConfig,
- final QueryContext queryContext)
+ final QueryContext queryContext
+ )
{
- return new QueryKitSpec(
- queryKit,
- queryId,
- getNumWorkers(),
- queryContext.getInt(
- DartControllerContext.CTX_MAX_NON_LEAF_WORKER_COUNT,
- DartControllerContext.DEFAULT_MAX_NON_LEAF_WORKER_COUNT
- ),
- MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
- queryContext,
- DartControllerContext.DEFAULT_TARGET_PARTITIONS_PER_WORKER
- )
- );
- }
-
- private int getNumWorkers()
- {
- int cnt = 0;
- for (DruidServerMetadata s : serverView.getDruidServerMetadatas()) {
- if (s.getType() == ServerType.HISTORICAL) {
- cnt++;
- }
- }
-
- // Even if all segments are realtime, launch at least one worker.
- return Math.max(1, cnt);
+ return new QueryKitSpec(queryKit, queryId);
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java
index 80af571b22f..b1fe9c03157 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java
@@ -19,46 +19,23 @@
package org.apache.druid.msq.sql;
-import com.google.inject.Inject;
import org.apache.druid.msq.exec.QueryKitSpecFactory;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitSpec;
-import org.apache.druid.msq.util.MultiStageQueryContext;
-import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
public class MSQTaskQueryKitSpecFactory implements QueryKitSpecFactory
{
- private DruidProcessingConfig processingConfig;
-
- @Inject
- public MSQTaskQueryKitSpecFactory(DruidProcessingConfig processingConfig)
- {
- this.processingConfig = processingConfig;
- }
-
@Override
public QueryKitSpec makeQueryKitSpec(
QueryKit<Query<?>> queryKit,
String queryId,
MSQTuningConfig tuningConfig,
- QueryContext queryContext)
+ QueryContext queryContext
+ )
{
- return new QueryKitSpec(
- queryKit,
- queryId,
- tuningConfig.getMaxNumWorkers(),
- tuningConfig.getMaxNumWorkers(),
-
- // Assume tasks are symmetric: workers have the same number of
processors available as a controller.
- // Create one partition per processor per task, for maximum
parallelism.
- MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
- queryContext,
- processingConfig.getNumThreads()
- )
- );
+ return new QueryKitSpec(queryKit, queryId);
}
-
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index d4116fed863..999de322393 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -83,7 +83,6 @@ import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryFrameworkUtils;
-import org.apache.druid.sql.calcite.util.TestTimelineServerView;
import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.sql.hook.DruidHookDispatcher;
import org.apache.druid.sql.http.EngineInfo;
@@ -271,7 +270,7 @@ public class DartSqlResourceTest extends MSQTestBase
)
)
),
- new DartQueryKitSpecFactory(new
TestTimelineServerView(Collections.emptyList())),
+ new DartQueryKitSpecFactory(),
injector.getInstance(MultiQueryKit.class),
new ServerConfig(),
new DefaultQueryConfig(ImmutableMap.of("foo", "bar")),
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
index 6291a9d7f35..28d40e08766 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
@@ -233,6 +233,7 @@ public class MSQTasksTest
null, // WorkerFailureListener
ImmutableMap.of(),
TimeUnit.SECONDS.toMillis(5),
+ Limits.MAX_WORKERS,
new MSQWorkerTaskLauncherConfig()
);
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
index 5223dae9834..0ca643f109f 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
@@ -39,6 +39,7 @@ import
org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.exec.WorkerFailureListener;
import org.apache.druid.msq.indexing.error.MSQFault;
@@ -90,6 +91,7 @@ public class MSQWorkerTaskLauncherRetryTest
workerFailureListener,
ImmutableMap.of(),
TimeUnit.SECONDS.toMillis(5),
+ Limits.MAX_WORKERS,
new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(),
2
);
@@ -134,6 +136,7 @@ public class MSQWorkerTaskLauncherRetryTest
workerFailureListener,
ImmutableMap.of(),
TimeUnit.SECONDS.toMillis(5),
+ Limits.MAX_WORKERS,
new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(),
2
);
@@ -180,6 +183,7 @@ public class MSQWorkerTaskLauncherRetryTest
workerFailureListener,
ImmutableMap.of(),
TimeUnit.SECONDS.toMillis(5),
+ Limits.MAX_WORKERS,
new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(),
2
);
@@ -210,6 +214,7 @@ public class MSQWorkerTaskLauncherRetryTest
workerFailureListener,
ImmutableMap.of(),
TimeUnit.SECONDS.toMillis(5),
+ Limits.MAX_WORKERS,
new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(),
2
);
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
index 2e5c4859904..7badce6168b 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.indexing;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.WorkerFailureListener;
import
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
import org.apache.druid.rpc.indexing.OverlordClient;
@@ -46,6 +47,7 @@ public class MSQWorkerTaskLauncherTest
getWorkerFailureListener(),
ImmutableMap.of(),
TimeUnit.SECONDS.toMillis(5),
+ Limits.MAX_WORKERS,
new MSQWorkerTaskLauncherConfig()
);
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
index bd1be760d27..fcef8d627a1 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
@@ -183,7 +183,8 @@ public class MockQueryDefinitionBuilder
),
0
),
- MAX_NUM_PARTITIONS
+ MAX_NUM_PARTITIONS,
+ false
);
break;
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index e7dda29cc55..e4b2850a74b 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -445,6 +445,7 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
workerFailureListener,
IndexerControllerContext.makeTaskContext(querySpec, queryKernelConfig,
ImmutableMap.of()),
0,
+ querySpec.getTuningConfig().getMaxNumWorkers(),
taskLauncherConfig
);
}
@@ -472,6 +473,18 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
return new MSQTestWorkerClient(inMemoryWorkers, mapper);
}
+ @Override
+ public int maxNonLeafWorkerCount()
+ {
+ return NUM_WORKERS;
+ }
+
+ @Override
+ public int targetPartitionsPerWorker()
+ {
+ return 1;
+ }
+
@Override
public ControllerContext newContext(QueryContext context)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]