This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5608a8a20ec refactor: Remove parallelism configs from QueryKitSpec. 
(#19280)
5608a8a20ec is described below

commit 5608a8a20ec72b2e2a786ccbd5e08338e98add85
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Apr 14 20:58:19 2026 -0700

    refactor: Remove parallelism configs from QueryKitSpec. (#19280)
    
    This patch reduces QueryKitSpec to simply a QueryKit plus a
    desired query ID. All parallelism configs are removed. This is
    useful because QueryKitSpec is not used at all in pre-planned
    decoupled planning. Therefore, this change helps better align
    plans created in decoupled mode from plans created with QueryKit.
    
    The parameters have been migrated to other locations:
    
    - maxLeafWorkerCount is moved to WorkerManager#maxWorkerCount.
    
    - maxNonLeafWorkerCount is moved to ControllerContext.
    
    - targetPartitionsPerWorker is moved to ControllerContext.
      Planners can signify that they want their partition counts set to
      "maxWorkerCount * targetPartitionsPerWorker" by using the new
      "adjustable" parameter on various ShuffleSpecs.
---
 .../msq/dart/controller/DartControllerContext.java | 18 +++++
 .../msq/dart/controller/DartWorkerManager.java     |  6 ++
 .../controller/sql/PrePlannedDartQueryMaker.java   |  4 +-
 .../apache/druid/msq/exec/ControllerContext.java   | 12 +++
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 14 +++-
 .../msq/exec/ControllerQueryResultsReader.java     |  3 -
 .../org/apache/druid/msq/exec/WorkerManager.java   |  6 ++
 .../msq/indexing/IndexerControllerContext.java     | 18 +++++
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  | 11 +++
 .../destination/DataSourceMSQDestination.java      |  2 +-
 .../destination/DurableStorageMSQDestination.java  |  2 +-
 .../indexing/destination/ExportMSQDestination.java |  2 +-
 .../msq/kernel/GlobalSortMaxCountShuffleSpec.java  | 43 ++++++++++-
 .../apache/druid/msq/kernel/HashShuffleSpec.java   | 58 ++++++++++++++-
 .../apache/druid/msq/kernel/QueryDefinition.java   | 34 +++++++++
 .../org/apache/druid/msq/kernel/ShuffleSpec.java   | 19 +++++
 .../apache/druid/msq/kernel/StageDefinition.java   | 43 +++++++++++
 .../druid/msq/kernel/controller/WorkerInputs.java  | 86 ++++++++++++----------
 .../DruidLogicalToQueryDefinitionTranslator.java   |  8 +-
 .../org/apache/druid/msq/logical/StageMaker.java   | 11 +++
 .../druid/msq/logical/stages/GroupByStages.java    |  2 +-
 .../apache/druid/msq/logical/stages/JoinStage.java |  2 +-
 .../druid/msq/logical/stages/LogicalStage.java     |  8 ++
 .../druid/msq/logical/stages/OffsetLimitStage.java |  6 ++
 .../apache/druid/msq/logical/stages/SortStage.java | 33 ++++++++-
 .../apache/druid/msq/querykit/DataSourcePlan.java  | 20 ++---
 .../apache/druid/msq/querykit/QueryKitSpec.java    | 50 ++-----------
 .../druid/msq/querykit/ShuffleSpecFactories.java   | 10 ++-
 .../druid/msq/querykit/WindowOperatorQueryKit.java | 15 ++--
 .../msq/querykit/groupby/GroupByQueryKit.java      |  7 +-
 .../druid/msq/querykit/scan/ScanQueryKit.java      |  2 +-
 .../druid/msq/sql/DartQueryKitSpecFactory.java     | 44 +----------
 .../druid/msq/sql/MSQTaskQueryKitSpecFactory.java  | 29 +-------
 .../dart/controller/http/DartSqlResourceTest.java  |  3 +-
 .../org/apache/druid/msq/exec/MSQTasksTest.java    |  1 +
 .../indexing/MSQWorkerTaskLauncherRetryTest.java   |  5 ++
 .../msq/indexing/MSQWorkerTaskLauncherTest.java    |  2 +
 .../controller/MockQueryDefinitionBuilder.java     |  3 +-
 .../druid/msq/test/MSQTestControllerContext.java   | 13 ++++
 39 files changed, 454 insertions(+), 201 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index b7e1b3c94bc..a4a4ff8945a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -236,4 +236,22 @@ public class DartControllerContext implements 
ControllerContext
   {
     throw DruidException.defensive("TaskLockType is not used with class[%s]", 
getClass().getName());
   }
+
+  @Override
+  public int maxNonLeafWorkerCount()
+  {
+    return context.getInt(
+        DartControllerContext.CTX_MAX_NON_LEAF_WORKER_COUNT,
+        DartControllerContext.DEFAULT_MAX_NON_LEAF_WORKER_COUNT
+    );
+  }
+
+  @Override
+  public int targetPartitionsPerWorker()
+  {
+    return MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
+        context,
+        DEFAULT_TARGET_PARTITIONS_PER_WORKER
+    );
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
index 05a26c39344..f6d68d57f42 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
@@ -155,6 +155,12 @@ public class DartWorkerManager implements WorkerManager
     return workerIdToNumber.containsKey(workerId);
   }
 
+  @Override
+  public int getMaxWorkerCount()
+  {
+    return workerIds.size();
+  }
+
   @Override
   public Map<Integer, List<WorkerStats>> getWorkerStats()
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
index 5740142dd63..7ec616d212a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
@@ -51,8 +51,8 @@ import java.util.Map.Entry;
  */
 class PrePlannedDartQueryMaker implements QueryMaker, 
QueryMaker.FromDruidLogical
 {
-  private PlannerContext plannerContext;
-  private DartQueryMaker dartQueryMaker;
+  private final PlannerContext plannerContext;
+  private final DartQueryMaker dartQueryMaker;
 
   public PrePlannedDartQueryMaker(PlannerContext plannerContext, 
DartQueryMaker queryMaker)
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index 008aae55818..2d14c10788b 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -29,6 +29,7 @@ import org.apache.druid.msq.indexing.MSQSpec;
 import org.apache.druid.msq.input.InputSpecSlicer;
 import org.apache.druid.msq.input.table.SegmentsInputSlice;
 import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
 import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
 import org.apache.druid.server.DruidNode;
 
@@ -123,4 +124,15 @@ public interface ControllerContext
    * Client for communicating with workers.
    */
   WorkerClient newWorkerClient();
+
+  /**
+   * Maximum number of workers for non-leaf stages.
+   */
+  int maxNonLeafWorkerCount();
+
+  /**
+   * Target number of partitions per worker for shuffle stages. Used at 
runtime to adjust
+   * shuffle specs that have {@link ShuffleSpec#isAdjustable()} set to true.
+   */
+  int targetPartitionsPerWorker();
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index e02d2909547..f616db7a032 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -728,7 +728,7 @@ public class ControllerImpl implements Controller
 
     final QueryContext queryContext = querySpec.getContext();
 
-    final QueryDefinition queryDef;
+    QueryDefinition queryDef;
     if (legacyQuery != null) {
       QueryKitBasedMSQPlanner qkPlanner = new QueryKitBasedMSQPlanner(
           querySpec,
@@ -762,9 +762,6 @@ public class ControllerImpl implements Controller
       }
     }
 
-    QueryValidator.validateQueryDef(queryDef);
-    queryDefRef.set(queryDef);
-
     workerManager = context.newWorkerManager(
         context.queryId(),
         querySpec,
@@ -772,6 +769,15 @@ public class ControllerImpl implements Controller
         getWorkerFailureListener()
     );
 
+    queryDef = queryDef.withRuntimeBounds(
+        workerManager.getMaxWorkerCount(),
+        context.maxNonLeafWorkerCount(),
+        context.targetPartitionsPerWorker()
+    );
+
+    QueryValidator.validateQueryDef(queryDef);
+    queryDefRef.set(queryDef);
+
     if (queryKernelConfig.isFaultTolerant() && !(workerManager instanceof 
RetryCapableWorkerManager)) {
       // Not expected to happen, since all WorkerManager impls are currently 
retry-capable. Defensive check
       // for future-proofing.
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
index e8766f049fd..51ddb03b2e4 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
@@ -26,7 +26,6 @@ import org.apache.druid.frame.processor.FrameProcessor;
 import org.apache.druid.frame.processor.FrameProcessors;
 import org.apache.druid.frame.processor.ReturnOrAwait;
 import org.apache.druid.frame.read.FrameReader;
-import org.apache.druid.java.util.common.logger.Logger;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -37,8 +36,6 @@ import java.util.List;
  */
 public class ControllerQueryResultsReader implements FrameProcessor<Void>
 {
-  private static final Logger log = new 
Logger(ControllerQueryResultsReader.class);
-
   private final ReadableFrameChannel in;
   private final FrameReader frameReader;
   private final QueryListener queryListener;
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
index ff76867dd30..01bcb81a756 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
@@ -89,6 +89,12 @@ public interface WorkerManager
    */
   Map<Integer, List<WorkerStats>> getWorkerStats();
 
+  /**
+   * Maximum number of workers that can be used by this manager. Used at 
runtime to cap
+   * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()}.
+   */
+  int getMaxWorkerCount();
+
   /**
    * Stop all workers.
    *
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 3edd414b515..cd91376e5c9 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -231,6 +231,7 @@ public class IndexerControllerContext implements 
ControllerContext
         makeTaskContext(querySpec, queryKernelConfig, taskContext),
         // 10 minutes +- 2 minutes jitter
         TimeUnit.SECONDS.toMillis(600 + 
ThreadLocalRandom.current().nextInt(-4, 5) * 30L),
+        task.getQuerySpec().getTuningConfig().getMaxNumWorkers(),
         new MSQWorkerTaskLauncherConfig()
     );
   }
@@ -241,6 +242,23 @@ public class IndexerControllerContext implements 
ControllerContext
     return toolbox.getIndexingTmpDir();
   }
 
+  @Override
+  public int maxNonLeafWorkerCount()
+  {
+    return task.getQuerySpec().getTuningConfig().getMaxNumWorkers();
+  }
+
+  @Override
+  public int targetPartitionsPerWorker()
+  {
+    // Assume tasks are symmetric: workers have the same number of processors 
available as a controller.
+    // Create one partition per processor per worker, for maximum parallelism.
+    return MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
+        taskQuerySpecContext,
+        memoryIntrospector.numProcessingThreads()
+    );
+  }
+
   /**
    * Helper method for {@link #queryKernelConfig(MSQSpec)}. Also used in tests.
    */
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index d773f225fac..7ef30346d7a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -100,6 +100,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
   private final String controllerTaskId;
   private final String dataSource;
   private final OverlordClient overlordClient;
+  private final int maxWorkerCount;
   private final ExecutorService exec;
   private final long maxTaskStartDelayMillis;
   private final MSQWorkerTaskLauncherConfig config;
@@ -165,6 +166,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
       final WorkerFailureListener workerFailureListener,
       final Map<String, Object> taskContextOverrides,
       final long maxTaskStartDelayMillis,
+      final int maxWorkerCount,
       final MSQWorkerTaskLauncherConfig config
   )
   {
@@ -175,6 +177,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
         workerFailureListener,
         taskContextOverrides,
         maxTaskStartDelayMillis,
+        maxWorkerCount,
         config,
         TimeUnit.SECONDS.toMillis(60)
     );
@@ -188,6 +191,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
       final WorkerFailureListener workerFailureListener,
       final Map<String, Object> taskContextOverrides,
       final long maxTaskStartDelayMillis,
+      final int maxWorkerCount,
       final MSQWorkerTaskLauncherConfig config,
       final long taskIdsLockTimeout
   )
@@ -195,6 +199,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
     this.controllerTaskId = controllerTaskId;
     this.dataSource = dataSource;
     this.overlordClient = overlordClient;
+    this.maxWorkerCount = maxWorkerCount;
     this.workerFailureListener = workerFailureListener;
     this.taskContextOverrides = taskContextOverrides;
     this.exec = Execs.singleThreaded(
@@ -553,6 +558,12 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
     }
   }
 
+  @Override
+  public int getMaxWorkerCount()
+  {
+    return maxWorkerCount;
+  }
+
   /**
    * Returns a pair which contains the number of currently running worker 
tasks and the number of worker tasks that are
    * not yet fully started as left and right respectively.
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
index 02132ac2312..63f163f82ca 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
@@ -209,7 +209,7 @@ public class DataSourceMSQDestination implements 
MSQDestination
   @Override
   public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
   {
-    return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
+    return ShuffleSpecFactories.globalSortWithTargetSize(targetSize);
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java
index 88fe5f58e5a..eb124eda213 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java
@@ -56,7 +56,7 @@ public class DurableStorageMSQDestination implements 
MSQDestination
   @Override
   public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
   {
-    return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
+    return ShuffleSpecFactories.globalSortWithTargetSize(targetSize);
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java
index d6a78def63a..d03c2238fdb 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java
@@ -108,7 +108,7 @@ public class ExportMSQDestination implements MSQDestination
   @Override
   public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
   {
-    return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
+    return ShuffleSpecFactories.globalSortWithTargetSize(targetSize);
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
index 35576474ff9..bcb2718c402 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java
@@ -44,24 +44,31 @@ public class GlobalSortMaxCountShuffleSpec implements 
GlobalSortShuffleSpec
   private final int maxPartitions;
   private final boolean aggregate;
   private final long limitHint;
+  private final boolean adjustable;
 
   @JsonCreator
   public GlobalSortMaxCountShuffleSpec(
       @JsonProperty("clusterBy") final ClusterBy clusterBy,
       @JsonProperty("partitions") final int maxPartitions,
       @JsonProperty("aggregate") final boolean aggregate,
-      @JsonProperty("limitHint") final Long limitHint
+      @JsonProperty("limitHint") final Long limitHint,
+      @JsonProperty("adjustable") final boolean adjustable
   )
   {
     this.clusterBy = Preconditions.checkNotNull(clusterBy, "clusterBy");
     this.maxPartitions = maxPartitions;
     this.aggregate = aggregate;
     this.limitHint = limitHint == null ? UNLIMITED : limitHint;
+    this.adjustable = adjustable;
 
     if (maxPartitions < 1) {
       throw new IAE("Partition count must be at least 1");
     }
 
+    if (adjustable && maxPartitions != 1) {
+      throw new IAE("Partition count must be 1 when adjustable is true, but 
was [%d]", maxPartitions);
+    }
+
     if (!clusterBy.sortable()) {
       throw new IAE("ClusterBy key must be sortable");
     }
@@ -72,6 +79,16 @@ public class GlobalSortMaxCountShuffleSpec implements 
GlobalSortShuffleSpec
     }
   }
 
+  public GlobalSortMaxCountShuffleSpec(
+      final ClusterBy clusterBy,
+      final int maxPartitions,
+      final boolean aggregate,
+      final Long limitHint
+  )
+  {
+    this(clusterBy, maxPartitions, aggregate, limitHint, false);
+  }
+
   @Override
   public ShuffleKind kind()
   {
@@ -144,6 +161,26 @@ public class GlobalSortMaxCountShuffleSpec implements 
GlobalSortShuffleSpec
     return limitHint;
   }
 
+  @Override
+  @JsonProperty("adjustable")
+  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+  public boolean isAdjustable()
+  {
+    return adjustable;
+  }
+
+  @Override
+  public ShuffleSpec withPartitionCount(final int partitionCount)
+  {
+    return new GlobalSortMaxCountShuffleSpec(
+        clusterBy,
+        partitionCount,
+        aggregate,
+        limitHint == UNLIMITED ? null : limitHint,
+        false
+    );
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -156,6 +193,7 @@ public class GlobalSortMaxCountShuffleSpec implements 
GlobalSortShuffleSpec
     GlobalSortMaxCountShuffleSpec that = (GlobalSortMaxCountShuffleSpec) o;
     return maxPartitions == that.maxPartitions
            && aggregate == that.aggregate
+           && adjustable == that.adjustable
            && Objects.equals(clusterBy, that.clusterBy)
            && Objects.equals(limitHint, that.limitHint);
   }
@@ -163,7 +201,7 @@ public class GlobalSortMaxCountShuffleSpec implements 
GlobalSortShuffleSpec
   @Override
   public int hashCode()
   {
-    return Objects.hash(clusterBy, maxPartitions, aggregate, limitHint);
+    return Objects.hash(clusterBy, maxPartitions, aggregate, limitHint, 
adjustable);
   }
 
   @Override
@@ -174,6 +212,7 @@ public class GlobalSortMaxCountShuffleSpec implements 
GlobalSortShuffleSpec
            ", maxPartitions=" + maxPartitions +
            ", aggregate=" + aggregate +
            ", limitHint=" + limitHint +
+           ", adjustable=" + adjustable +
            '}';
   }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java
index 69e66fffe26..23e9e98a217 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java
@@ -20,25 +20,35 @@
 package org.apache.druid.msq.kernel;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.frame.key.ClusterBy;
 import org.apache.druid.java.util.common.IAE;
 
+import java.util.Objects;
+
 public class HashShuffleSpec implements ShuffleSpec
 {
   public static final String TYPE = "hash";
 
   private final ClusterBy clusterBy;
   private final int numPartitions;
+  private final boolean adjustable;
 
   @JsonCreator
   public HashShuffleSpec(
       @JsonProperty("clusterBy") final ClusterBy clusterBy,
-      @JsonProperty("partitions") final int numPartitions
+      @JsonProperty("partitions") final int numPartitions,
+      @JsonProperty("adjustable") final boolean adjustable
   )
   {
     this.clusterBy = clusterBy;
     this.numPartitions = numPartitions;
+    this.adjustable = adjustable;
+
+    if (adjustable && numPartitions != 1) {
+      throw new IAE("Partition count must be 1 when adjustable is true, but 
was [%d]", numPartitions);
+    }
 
     if (clusterBy.getBucketByCount() > 0) {
       // Only GlobalSortTargetSizeShuffleSpec supports bucket-by.
@@ -65,4 +75,50 @@ public class HashShuffleSpec implements ShuffleSpec
   {
     return numPartitions;
   }
+
+  @Override
+  @JsonProperty("adjustable")
+  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+  public boolean isAdjustable()
+  {
+    return adjustable;
+  }
+
+  @Override
+  public ShuffleSpec withPartitionCount(final int partitionCount)
+  {
+    return new HashShuffleSpec(
+        clusterBy,
+        partitionCount,
+        false
+    );
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    HashShuffleSpec that = (HashShuffleSpec) o;
+    return numPartitions == that.numPartitions
+           && adjustable == that.adjustable
+           && Objects.equals(clusterBy, that.clusterBy);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(clusterBy, numPartitions, adjustable);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "HashShuffleSpec{" +
+           "clusterBy=" + clusterBy +
+           ", numPartitions=" + numPartitions +
+           ", adjustable=" + adjustable +
+           '}';
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java
index e171118be2c..f315d029f7d 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java
@@ -38,6 +38,7 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -207,4 +208,37 @@ public class QueryDefinition
   {
     return new QueryDefinition(stageDefinitions, finalStage, 
context.override(contextOverride));
   }
+
+  /**
+   * Returns a new {@link QueryDefinition} with runtime bounds applied:
+   * <ul>
+   *   <li>All stages {@link StageDefinition#getMaxWorkerCount()} are capped 
to {@code maxWorkerCount}.</li>
+   *   <li>All nonleaf stages {@link StageDefinition#getMaxWorkerCount()} are 
further capped to
+   *       {@code maxNonLeafWorkerCount}.</li>
+   *   <li>All stage shuffle specs, if {@link ShuffleSpec#isAdjustable()}, 
have their partition count set to
+   *       the capped max worker count times {@code 
targetPartitionsPerWorker}.</li>
+   * </ul>
+   */
+  public QueryDefinition withRuntimeBounds(
+      final int maxWorkerCount,
+      final int maxNonLeafWorkerCount,
+      final int targetPartitionsPerWorker
+  )
+  {
+    boolean anyChanged = false;
+    final Map<StageId, StageDefinition> newStageDefinitions = new 
LinkedHashMap<>();
+    for (Map.Entry<StageId, StageDefinition> entry : 
stageDefinitions.entrySet()) {
+      final StageDefinition stageDef = entry.getValue();
+      final StageDefinition adjustedStageDef =
+          stageDef.withRuntimeBounds(maxWorkerCount, maxNonLeafWorkerCount, 
targetPartitionsPerWorker);
+      newStageDefinitions.put(entry.getKey(), adjustedStageDef);
+      if (!Objects.equals(adjustedStageDef, stageDef)) {
+        anyChanged = true;
+      }
+    }
+    if (!anyChanged) {
+      return this;
+    }
+    return new QueryDefinition(newStageDefinitions, finalStage, context);
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java
index 97f3e6db547..2d02acb5ecc 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.kernel;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.error.NotYetImplemented;
 import org.apache.druid.frame.key.ClusterBy;
 
 /**
@@ -83,4 +84,22 @@ public interface ShuffleSpec
   {
     return UNLIMITED;
   }
+
+  /**
+   * Whether this shuffle spec's partition count should be adjusted at runtime 
by multiplying by
+   * {@code targetPartitionsPerWorker} from the controller context.
+   */
+  default boolean isAdjustable()
+  {
+    return false;
+  }
+
+  /**
+   * Returns a copy of this shuffle spec with the partition count set to 
{@code partitionCount}
+   * and {@code adjustable} set to false. Only meaningful when {@link 
#isAdjustable()} is true.
+   */
+  default ShuffleSpec withPartitionCount(int partitionCount)
+  {
+    throw NotYetImplemented.ex(null, "withPartitionCount not implemented for 
[%s]", getClass().getSimpleName());
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index f836a357d3c..7fc3db19f1a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -175,6 +175,49 @@ public class StageDefinition
         
.shuffleCheckHasMultipleValues(stageDef.getShuffleCheckHasMultipleValues());
   }
 
+  /**
+   * Returns a new {@link StageDefinition} with runtime bounds applied. See 
{@link QueryDefinition#withRuntimeBounds}
+   * for details on the logic.
+   */
+  public StageDefinition withRuntimeBounds(
+      final int maxWorkerCount,
+      final int maxNonLeafWorkerCount,
+      final int targetPartitionsPerWorker
+  )
+  {
+    final int adjustedMaxWorkerCount;
+    final ShuffleSpec adjustedShuffleSpec;
+
+    if (InputSpecs.hasLeafInputs(inputSpecs, getBroadcastInputNumbers())) {
+      // Leaf stage.
+      adjustedMaxWorkerCount = Math.min(this.maxWorkerCount, maxWorkerCount);
+    } else {
+      // Nonleaf stage.
+      adjustedMaxWorkerCount = Math.min(this.maxWorkerCount, 
Math.min(maxWorkerCount, maxNonLeafWorkerCount));
+    }
+
+    if (shuffleSpec != null && shuffleSpec.isAdjustable()) {
+      adjustedShuffleSpec = 
shuffleSpec.withPartitionCount(adjustedMaxWorkerCount * 
targetPartitionsPerWorker);
+    } else {
+      adjustedShuffleSpec = shuffleSpec;
+    }
+
+    if (adjustedMaxWorkerCount == this.maxWorkerCount && 
Objects.equals(adjustedShuffleSpec, shuffleSpec)) {
+      return this;
+    }
+
+    return new StageDefinition(
+        id,
+        inputSpecs,
+        broadcastInputNumbers,
+        processor,
+        signature,
+        adjustedShuffleSpec,
+        adjustedMaxWorkerCount,
+        shuffleCheckHasMultipleValues
+    );
+  }
+
   /**
    * Returns a unique stage identifier.
    */
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
index ef825ed69d3..92531eb1c00 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
@@ -77,48 +77,60 @@ public class WorkerInputs
       return new WorkerInputs(assignmentsMap);
     }
 
-    // Assign input slices to workers.
+    // Assign non-broadcast input slices to workers first, so we know the 
actual worker count.
+    // Then assign broadcast inputs to all assigned workers.
     for (int inputNumber = 0; inputNumber < numInputs; inputNumber++) {
+      if (stageDef.getBroadcastInputNumbers().contains(inputNumber)) {
+        continue;
+      }
+
       final InputSpec inputSpec = stageDef.getInputSpecs().get(inputNumber);
       final SegmentPruner pruner = stageDef.getSegmentPruner(inputNumber);
 
-      if (stageDef.getBroadcastInputNumbers().contains(inputNumber)) {
-        // Broadcast case: send everything everywhere.
-        final List<InputSlice> broadcastSlices = slicer.sliceStatic(inputSpec, 
pruner, 1);
-        final InputSlice broadcastSlice = broadcastSlices.isEmpty()
-                                          ? NilInputSlice.INSTANCE
-                                          : 
Iterables.getOnlyElement(broadcastSlices);
-
-        for (int workerNumber = 0; workerNumber < 
stageDef.getMaxWorkerCount(); workerNumber++) {
-          assignmentsMap.computeIfAbsent(
-              workerNumber,
-              ignored -> Arrays.asList(new InputSlice[numInputs])
-          ).set(inputNumber, broadcastSlice);
-        }
-      } else {
-        // Non-broadcast case: split slices across workers.
-        List<InputSlice> slices = assignmentStrategy.assign(
-            stageDef,
-            inputSpec,
-            stageWorkerCountMap,
-            slicer,
-            pruner,
-            maxInputFilesPerWorker,
-            maxInputBytesPerWorker
-        );
-
-        if (slices.isEmpty()) {
-          // Need at least one slice, so we can have at least one worker. It's 
OK if it has nothing to read.
-          slices = Collections.singletonList(NilInputSlice.INSTANCE);
-        }
+      List<InputSlice> slices = assignmentStrategy.assign(
+          stageDef,
+          inputSpec,
+          stageWorkerCountMap,
+          slicer,
+          pruner,
+          maxInputFilesPerWorker,
+          maxInputBytesPerWorker
+      );
+
+      if (slices.isEmpty()) {
+        // Need at least one slice, so we can have at least one worker. It's 
OK if it has nothing to read.
+        slices = Collections.singletonList(NilInputSlice.INSTANCE);
+      }
 
-        // Flip the slices, so it's worker number -> slices for that worker.
-        for (int workerNumber = 0; workerNumber < slices.size(); 
workerNumber++) {
-          assignmentsMap.computeIfAbsent(
-              workerNumber,
-              ignored -> Arrays.asList(new InputSlice[numInputs])
-          ).set(inputNumber, slices.get(workerNumber));
-        }
+      // Flip the slices, so it's worker number -> slices for that worker.
+      for (int workerNumber = 0; workerNumber < slices.size(); workerNumber++) 
{
+        assignmentsMap.computeIfAbsent(
+            workerNumber,
+            ignored -> Arrays.asList(new InputSlice[numInputs])
+        ).set(inputNumber, slices.get(workerNumber));
+      }
+    }
+
+    if (assignmentsMap.isEmpty()) {
+      // All inputs are broadcast. Use a single worker.
+      assignmentsMap.put(0, Arrays.asList(new InputSlice[numInputs]));
+    }
+
+    // Assign broadcast inputs to all workers determined by non-broadcast 
assignment above.
+    for (int inputNumber = 0; inputNumber < numInputs; inputNumber++) {
+      if (!stageDef.getBroadcastInputNumbers().contains(inputNumber)) {
+        continue;
+      }
+
+      final InputSpec inputSpec = stageDef.getInputSpecs().get(inputNumber);
+      final SegmentPruner pruner = stageDef.getSegmentPruner(inputNumber);
+      final List<InputSlice> broadcastSlices = slicer.sliceStatic(inputSpec, 
pruner, 1);
+      final InputSlice broadcastSlice = broadcastSlices.isEmpty()
+                                        ? NilInputSlice.INSTANCE
+                                        : 
Iterables.getOnlyElement(broadcastSlices);
+
+      for (int workerNumber : assignmentsMap.keySet()) {
+        assignmentsMap.get(workerNumber).set(inputNumber, broadcastSlice);
       }
     }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
index 138bc1f2995..c46fa410cf6 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
@@ -122,12 +122,14 @@ public class DruidLogicalToQueryDefinitionTranslator
       DruidSort sort = (DruidSort) stack.getNode();
       List<OrderByColumnSpec> orderBySpecs = 
DruidQuery.buildOrderByColumnSpecs(inputStage.getLogicalRowSignature(), sort);
       List<KeyColumn> keyColumns = Lists.transform(orderBySpecs, 
KeyColumn::fromOrderByColumnSpec);
-      SortStage sortStage = new SortStage(inputStage, keyColumns);
 
       if (sort.hasLimitOrOffset()) {
-        return new OffsetLimitStage(sortStage, sort.getOffsetLimit());
+        return new OffsetLimitStage(
+            new SortStage(inputStage, keyColumns, sort.getOffsetLimit()),
+            sort.getOffsetLimit()
+        );
       } else {
-        return sortStage;
+        return new SortStage(inputStage, keyColumns, null);
       }
     }
     if (stack.getNode() instanceof DruidUnnest) {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
index 4885a7d3e7d..3b3222f8890 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
@@ -20,6 +20,7 @@
 package org.apache.druid.msq.logical;
 
 import org.apache.druid.error.DruidException;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.exec.StageProcessor;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.kernel.MixShuffleSpec;
@@ -100,6 +101,7 @@ public class StageMaker
     sdb.signature(frameProcessorStage.getLogicalRowSignature());
     sdb.processor(stageProcessor);
     sdb.shuffleSpec(MixShuffleSpec.instance());
+    sdb.maxWorkerCount(maxWorkerCountFor(frameProcessorStage));
     return sdb;
   }
 
@@ -110,6 +112,7 @@ public class StageMaker
     sdb.signature(stage.getRowSignature());
     sdb.processor(makeScanStageProcessor(VirtualColumns.EMPTY, 
stage.getRowSignature(), null));
     sdb.shuffleSpec(stage.buildShuffleSpec());
+    sdb.maxWorkerCount(maxWorkerCountFor(stage));
     return sdb;
   }
 
@@ -163,4 +166,12 @@ public class StageMaker
   {
     return ScanQueryStageProcessor.makeSegmentMapFnProcessor(signature, 
dataSource);
   }
+
+  /**
+   * Returns the {@code maxWorkerCount} to use for a given {@link 
LogicalStage}.
+   */
+  private static int maxWorkerCountFor(LogicalStage stage)
+  {
+    return stage.isSingleWorker() ? 1 : Limits.MAX_WORKERS;
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
index e955c6a4052..809a33cfc2b 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
@@ -91,7 +91,7 @@ public class GroupByStages
   {
     GroupByQuery gby = makeGbyQuery(projectStage, grouping);
     PreShuffleStage aggStage = new PreShuffleStage(projectStage, 
gby.withPostAggregatorSpecs(Collections.emptyList()));
-    SortStage sortStage = new SortStage(aggStage, 
getKeyColumns(grouping.getDimensions()));
+    SortStage sortStage = new SortStage(aggStage, 
getKeyColumns(grouping.getDimensions()), null);
     PostShuffleStage finalAggStage = new PostShuffleStage(sortStage, gby, 
grouping.getOutputRowSignature());
     return finalAggStage;
   }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
index 96cf3d3510b..77cf2f2f740 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
@@ -76,7 +76,7 @@ public class JoinStage
     public ShuffleSpec buildShuffleSpec()
     {
       final ClusterBy clusterBy = new ClusterBy(keyColumns, 0);
-      return new HashShuffleSpec(clusterBy, 1);
+      return new HashShuffleSpec(clusterBy, 1, true);
     }
 
     @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
index 59f88264dbd..992a3f439b7 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
@@ -59,4 +59,12 @@ public interface LogicalStage
    * Returns the inputs of this stage.
    */
   List<LogicalInputSpec> getInputSpecs();
+
+  /**
+   * Whether this stage must run on a single worker.
+   */
+  default boolean isSingleWorker()
+  {
+    return false;
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java
index 88496394c65..1950faa5511 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java
@@ -53,6 +53,12 @@ public class OffsetLimitStage extends 
AbstractFrameProcessorStage
     );
   }
 
+  @Override
+  public boolean isSingleWorker()
+  {
+    return true;
+  }
+
   @Override
   public RowSignature getLogicalRowSignature()
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
index 834ffdcf6f3..d97f7397dee 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
@@ -26,21 +26,31 @@ import org.apache.druid.msq.logical.LogicalInputSpec;
 import org.apache.druid.msq.querykit.QueryKitUtils;
 import org.apache.druid.msq.querykit.ShuffleSpecFactories;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.OffsetLimit;
 import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
 
+import javax.annotation.Nullable;
 import java.util.List;
 
 public class SortStage extends AbstractShuffleStage
 {
   protected final List<KeyColumn> keyColumns;
 
-  public SortStage(LogicalStage inputStage, List<KeyColumn> keyColumns)
+  /**
+   * Non-null when this sort feeds a downstream {@link OffsetLimitStage}, in 
which case the shuffle output
+   * is funneled into a single sorted partition with a {@code limitHint} 
derived from the offset and limit.
+   */
+  @Nullable
+  protected final OffsetLimit offsetLimit;
+
+  public SortStage(LogicalStage inputStage, List<KeyColumn> keyColumns, 
@Nullable OffsetLimit offsetLimit)
   {
     super(
         QueryKitUtils.sortableSignature(inputStage.getLogicalRowSignature(), 
keyColumns),
         LogicalInputSpec.of(inputStage)
     );
     this.keyColumns = keyColumns;
+    this.offsetLimit = offsetLimit;
   }
 
   @Override
@@ -53,7 +63,26 @@ public class SortStage extends AbstractShuffleStage
   public ShuffleSpec buildShuffleSpec()
   {
     final ClusterBy clusterBy = new ClusterBy(keyColumns, 0);
-    return 
ShuffleSpecFactories.globalSortWithMaxPartitionCount(1).build(clusterBy, false);
+    if (offsetLimit != null) {
+      // Funnel everything through a single sorted partition so the downstream 
OffsetLimitStage can apply the
+      // offset and limit.
+      return 
ShuffleSpecFactories.singlePartitionWithLimit(computeLimitHint(offsetLimit)).build(clusterBy,
 false);
+    } else {
+      return 
ShuffleSpecFactories.globalSortWithTargetPartitions().build(clusterBy, false);
+    }
+  }
+
+  /**
+   * Computes the {@link ShuffleSpec#limitHint()} that the upstream sort can 
use to short-circuit work when the
+   * downstream stage applies an offset and limit.
+   */
+  private static long computeLimitHint(OffsetLimit offsetLimit)
+  {
+    if (offsetLimit.hasLimit() && offsetLimit.getOffset() + 
offsetLimit.getLimit() > 0 /* overflow check */) {
+      return offsetLimit.getOffset() + offsetLimit.getLimit();
+    } else {
+      return ShuffleSpec.UNLIMITED;
+    }
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
index bdc4370f9e7..d3d4d86405b 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
@@ -31,8 +31,8 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.InputSpec;
-import org.apache.druid.msq.input.InputSpecs;
 import org.apache.druid.msq.input.external.ExternalInputSpec;
 import org.apache.druid.msq.input.inline.InlineInputSpec;
 import org.apache.druid.msq.input.lookup.LookupInputSpec;
@@ -267,14 +267,13 @@ public class DataSourcePlan
   /**
    * Figure for {@link StageDefinition#getMaxWorkerCount()} that should be 
used when processing.
    */
-  public int getMaxWorkerCount(final QueryKitSpec queryKitSpec)
+  public int getMaxWorkerCount()
   {
     if (isSingleWorker()) {
       return 1;
-    } else if (InputSpecs.hasLeafInputs(inputSpecs, broadcastInputs)) {
-      return queryKitSpec.getMaxLeafWorkerCount();
     } else {
-      return queryKitSpec.getMaxNonLeafWorkerCount();
+      // Use MAX_WORKERS as a high upper bound; capped at runtime by 
QueryDefinition.withRuntimeBounds.
+      return Limits.MAX_WORKERS;
     }
   }
 
@@ -431,7 +430,7 @@ public class DataSourcePlan
         // Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in 
the context. It's only used for the
         // outermost query, and setting it for the subquery makes us 
erroneously add bucketing where it doesn't belong.
         
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
-        
ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()),
+        ShuffleSpecFactories.globalSortWithTargetPartitions(),
         minStageNumber
     );
 
@@ -669,10 +668,10 @@ public class DataSourcePlan
         ((StageInputSpec) 
Iterables.getOnlyElement(leftPlan.getInputSpecs())).getStageNumber()
     );
 
-    final int hashPartitionCount = queryKitSpec.getNumPartitionsForShuffle();
     final List<KeyColumn> leftPartitionKey = partitionKeys.get(0);
-    leftBuilder.shuffleSpec(new HashShuffleSpec(new 
ClusterBy(leftPartitionKey, 0), hashPartitionCount));
+    leftBuilder.shuffleSpec(new HashShuffleSpec(new 
ClusterBy(leftPartitionKey, 0), 1, true));
     
leftBuilder.signature(QueryKitUtils.sortableSignature(leftBuilder.getSignature(),
 leftPartitionKey));
+    leftBuilder.maxWorkerCount(Limits.MAX_WORKERS);
 
     // Build up the right stage.
     final StageDefinitionBuilder rightBuilder = 
subQueryDefBuilder.getStageBuilder(
@@ -680,8 +679,9 @@ public class DataSourcePlan
     );
 
     final List<KeyColumn> rightPartitionKey = partitionKeys.get(1);
-    rightBuilder.shuffleSpec(new HashShuffleSpec(new 
ClusterBy(rightPartitionKey, 0), hashPartitionCount));
+    rightBuilder.shuffleSpec(new HashShuffleSpec(new 
ClusterBy(rightPartitionKey, 0), 1, true));
     
rightBuilder.signature(QueryKitUtils.sortableSignature(rightBuilder.getSignature(),
 rightPartitionKey));
+    rightBuilder.maxWorkerCount(Limits.MAX_WORKERS);
 
     // Compute join signature.
     final RowSignature.Builder joinSignatureBuilder = RowSignature.builder();
@@ -708,7 +708,7 @@ public class DataSourcePlan
                                
Iterables.getOnlyElement(rightPlan.getInputSpecs())
                            )
                        )
-                       .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
+                       .maxWorkerCount(Limits.MAX_WORKERS)
                        .signature(joinSignatureBuilder.build())
                        .processor(
                            new SortMergeJoinStageProcessor(
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java
index 2d0361a037f..4454026fca9 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java
@@ -19,45 +19,29 @@
 
 package org.apache.druid.msq.querykit;
 
-import org.apache.druid.msq.input.InputSpecs;
 import org.apache.druid.msq.kernel.QueryDefinition;
-import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.query.Query;
 
 /**
- * Collection of parameters for {@link QueryKit#makeQueryDefinition}.
+ * Container for {@link QueryKit} plus the queryId that we want to build.
  */
 public class QueryKitSpec
 {
   private final QueryKit<Query<?>> queryKit;
   private final String queryId;
-  private final int maxLeafWorkerCount;
-  private final int maxNonLeafWorkerCount;
-  private final int targetPartitionsPerWorker;
 
   /**
-   * @param queryKit                  kit that is used to translate native 
subqueries; i.e.,
-   *                                  {@link 
org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}.
-   * @param queryId                   queryId of the resulting {@link 
QueryDefinition}
-   * @param maxLeafWorkerCount        maximum number of workers for leaf 
stages: becomes
-   *                                  {@link 
StageDefinition#getMaxWorkerCount()}
-   * @param maxNonLeafWorkerCount     maximum number of workers for non-leaf 
stages: becomes
-   *                                  {@link 
StageDefinition#getMaxWorkerCount()}
-   * @param targetPartitionsPerWorker preferred number of partitions per 
worker for subqueries
+   * @param queryKit              kit that is used to translate native 
subqueries; i.e.,
+   *                              {@link 
org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}.
+   * @param queryId               queryId of the resulting {@link 
QueryDefinition}
    */
   public QueryKitSpec(
       QueryKit<Query<?>> queryKit,
-      String queryId,
-      int maxLeafWorkerCount,
-      int maxNonLeafWorkerCount,
-      int targetPartitionsPerWorker
+      String queryId
   )
   {
     this.queryId = queryId;
     this.queryKit = queryKit;
-    this.maxLeafWorkerCount = maxLeafWorkerCount;
-    this.maxNonLeafWorkerCount = maxNonLeafWorkerCount;
-    this.targetPartitionsPerWorker = targetPartitionsPerWorker;
   }
 
   /**
@@ -75,28 +59,4 @@ public class QueryKitSpec
   {
     return queryId;
   }
-
-  /**
-   * Maximum number of workers for leaf stages. See {@link 
InputSpecs#hasLeafInputs}.
-   */
-  public int getMaxLeafWorkerCount()
-  {
-    return maxLeafWorkerCount;
-  }
-
-  /**
-   * Maximum number of workers for non-leaf stages. See {@link 
InputSpecs#hasLeafInputs}.
-   */
-  public int getMaxNonLeafWorkerCount()
-  {
-    return maxNonLeafWorkerCount;
-  }
-
-  /**
-   * Number of partitions to generate during a shuffle.
-   */
-  public int getNumPartitionsForShuffle()
-  {
-    return maxNonLeafWorkerCount * targetPartitionsPerWorker;
-  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java
index 24f6dfffe27..11bfaad390a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java
@@ -24,6 +24,7 @@ import 
org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
 import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec;
 import org.apache.druid.msq.kernel.MixShuffleSpec;
 import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.kernel.StageDefinition;
 
 /**
  * Static factory methods for common implementations of {@link 
ShuffleSpecFactory}.
@@ -61,12 +62,13 @@ public class ShuffleSpecFactories
   }
 
   /**
-   * Factory that produces a particular number of output partitions.
+   * Factory that produces an adjustable globally-sorted shuffle spec. The 
partition count is adjusted at
+   * runtime by {@link StageDefinition#withRuntimeBounds(int, int, int)}.
    */
-  public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int 
partitions)
+  public static ShuffleSpecFactory globalSortWithTargetPartitions()
   {
     return (clusterBy, aggregate) ->
-        new GlobalSortMaxCountShuffleSpec(clusterBy, partitions, aggregate, 
ShuffleSpec.UNLIMITED);
+        new GlobalSortMaxCountShuffleSpec(clusterBy, 1, aggregate, 
ShuffleSpec.UNLIMITED, true);
   }
 
   /**
@@ -75,7 +77,7 @@ public class ShuffleSpecFactories
    *
    * Produces {@link MixShuffleSpec}, ignoring the target size, if the 
provided {@link ClusterBy} is empty.
    */
-  public static ShuffleSpecFactory getGlobalSortWithTargetSize(int targetSize)
+  public static ShuffleSpecFactory globalSortWithTargetSize(int targetSize)
   {
     return (clusterBy, aggregate) -> {
       if (clusterBy.isEmpty()) {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
index 7a2ec6f239a..eebd97b607b 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
@@ -28,6 +28,7 @@ import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.stage.StageInputSpec;
 import org.apache.druid.msq.kernel.HashShuffleSpec;
 import org.apache.druid.msq.kernel.MixShuffleSpec;
@@ -93,14 +94,13 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
     final WindowStages windowStages = new WindowStages(
         originalQuery,
         jsonMapper,
-        queryKitSpec.getNumPartitionsForShuffle(),
-        queryKitSpec.getMaxNonLeafWorkerCount(),
+        Limits.MAX_WORKERS,
         resultShuffleSpecFactory,
         signatureFromInput,
         isOperatorTransformationEnabled
     );
 
-    final ShuffleSpec nextShuffleSpec = 
windowStages.getStages().get(0).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle());
+    final ShuffleSpec nextShuffleSpec = 
windowStages.getStages().get(0).findShuffleSpec();
     final QueryDefinitionBuilder queryDefBuilder = 
makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, 
nextShuffleSpec);
     final int firstWindowStageNumber = Math.max(minStageNumber, 
queryDefBuilder.getNextStageNumber());
 
@@ -121,7 +121,6 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
   {
     private final List<WindowStage> stages;
     private final WindowOperatorQuery query;
-    private final int numPartitionsForShuffle;
     private final int maxNonLeafWorkerCount;
     private final ShuffleSpec finalWindowStageShuffleSpec;
     private final RowSignature finalWindowStageRowSignature;
@@ -131,7 +130,6 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
     private WindowStages(
         WindowOperatorQuery query,
         ObjectMapper jsonMapper,
-        int numPartitionsForShuffle,
         int maxNonLeafWorkerCount,
         ShuffleSpecFactory resultShuffleSpecFactory,
         RowSignature signatureFromInput,
@@ -140,7 +138,6 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
     {
       this.stages = new ArrayList<>();
       this.query = query;
-      this.numPartitionsForShuffle = numPartitionsForShuffle;
       this.maxNonLeafWorkerCount = maxNonLeafWorkerCount;
       this.isOperatorTransformationEnabled = isOperatorTransformationEnabled;
 
@@ -223,7 +220,7 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
       final WindowStage stage = stages.get(windowStageIndex);
       final ShuffleSpec shuffleSpec = (windowStageIndex == stages.size() - 1) ?
                                       finalWindowStageShuffleSpec :
-                                      stages.get(windowStageIndex + 
1).findShuffleSpec(numPartitionsForShuffle);
+                                      stages.get(windowStageIndex + 
1).findShuffleSpec();
 
       final RowSignature stageRowSignature = 
getRowSignatureForStage(windowStageIndex, shuffleSpec);
       final List<OperatorFactory> operatorFactories = 
isOperatorTransformationEnabled
@@ -362,7 +359,7 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
       return windowOperatorFactories;
     }
 
-    private ShuffleSpec findShuffleSpec(int partitionCount)
+    private ShuffleSpec findShuffleSpec()
     {
       Map<String, ColumnWithDirection.Direction> sortColumnsMap = new 
HashMap<>();
       if (sortOperatorFactory != null) {
@@ -392,7 +389,7 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
         keyColsOfWindow.add(kc);
       }
 
-      return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), 
partitionCount);
+      return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), 1, true);
     }
 
     private boolean canAccept(OperatorFactory operatorFactory)
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
index 8ace680938c..32dd480473e 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
@@ -29,6 +29,7 @@ import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.stage.StageInputSpec;
 import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
@@ -139,7 +140,7 @@ public class GroupByQueryKit implements 
QueryKit<GroupByQuery>
       shuffleSpecFactoryPreAggregation =
           intermediateClusterBy.isEmpty()
           ? ShuffleSpecFactories.singlePartition()
-          : 
ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle());
+          : ShuffleSpecFactories.globalSortWithTargetPartitions();
 
       if (doLimitOrOffset) {
         shuffleSpecFactoryPostAggregation = 
ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint);
@@ -164,7 +165,7 @@ public class GroupByQueryKit implements 
QueryKit<GroupByQuery>
                        .broadcastInputs(dataSourcePlan.getBroadcastInputs())
                        .signature(intermediateSignature)
                        
.shuffleSpec(shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, 
true))
-                       
.maxWorkerCount(dataSourcePlan.getMaxWorkerCount(queryKitSpec))
+                       .maxWorkerCount(dataSourcePlan.getMaxWorkerCount())
                        .processor(new 
GroupByPreShuffleStageProcessor(queryToRun))
     );
 
@@ -184,7 +185,7 @@ public class GroupByQueryKit implements 
QueryKit<GroupByQuery>
         StageDefinition.builder(firstStageNumber + 1)
                        .inputs(new StageInputSpec(firstStageNumber))
                        .signature(resultSignature)
-                       .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount())
+                       .maxWorkerCount(Limits.MAX_WORKERS)
                        .shuffleSpec(
                            shuffleSpecFactoryPostAggregation != null
                            ? 
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
index f6cc9222df9..6f82fd08772 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
@@ -161,7 +161,7 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
                        .broadcastInputs(dataSourcePlan.getBroadcastInputs())
                        .shuffleSpec(scanShuffleSpec)
                        .signature(signatureToUse)
-                       
.maxWorkerCount(dataSourcePlan.getMaxWorkerCount(queryKitSpec))
+                       .maxWorkerCount(dataSourcePlan.getMaxWorkerCount())
                        .processor(new ScanQueryStageProcessor(queryToRun))
     );
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
index d76264621d5..44ade1f09e0 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
@@ -19,61 +19,23 @@
 
 package org.apache.druid.msq.sql;
 
-import com.google.inject.Inject;
-import org.apache.druid.client.TimelineServerView;
-import org.apache.druid.msq.dart.controller.DartControllerContext;
 import org.apache.druid.msq.exec.QueryKitSpecFactory;
 import org.apache.druid.msq.indexing.MSQTuningConfig;
 import org.apache.druid.msq.querykit.QueryKit;
 import org.apache.druid.msq.querykit.QueryKitSpec;
-import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContext;
-import org.apache.druid.server.coordination.DruidServerMetadata;
-import org.apache.druid.server.coordination.ServerType;
 
 public class DartQueryKitSpecFactory implements QueryKitSpecFactory
 {
-  private final TimelineServerView serverView;
-
-  @Inject
-  public DartQueryKitSpecFactory(TimelineServerView serverView)
-  {
-    this.serverView = serverView;
-  }
-
   @Override
   public QueryKitSpec makeQueryKitSpec(
       final QueryKit<Query<?>> queryKit,
       final String queryId,
       final MSQTuningConfig tuningConfig,
-      final QueryContext queryContext)
+      final QueryContext queryContext
+  )
   {
-    return new QueryKitSpec(
-        queryKit,
-        queryId,
-        getNumWorkers(),
-        queryContext.getInt(
-            DartControllerContext.CTX_MAX_NON_LEAF_WORKER_COUNT,
-            DartControllerContext.DEFAULT_MAX_NON_LEAF_WORKER_COUNT
-        ),
-        MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
-            queryContext,
-            DartControllerContext.DEFAULT_TARGET_PARTITIONS_PER_WORKER
-        )
-    );
-  }
-
-  private int getNumWorkers()
-  {
-    int cnt = 0;
-    for (DruidServerMetadata s : serverView.getDruidServerMetadatas()) {
-      if (s.getType() == ServerType.HISTORICAL) {
-        cnt++;
-      }
-    }
-
-    // Even if all segments are realtime, launch at least one worker.
-    return Math.max(1, cnt);
+    return new QueryKitSpec(queryKit, queryId);
   }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java
index 80af571b22f..b1fe9c03157 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java
@@ -19,46 +19,23 @@
 
 package org.apache.druid.msq.sql;
 
-import com.google.inject.Inject;
 import org.apache.druid.msq.exec.QueryKitSpecFactory;
 import org.apache.druid.msq.indexing.MSQTuningConfig;
 import org.apache.druid.msq.querykit.QueryKit;
 import org.apache.druid.msq.querykit.QueryKitSpec;
-import org.apache.druid.msq.util.MultiStageQueryContext;
-import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContext;
 
 public class MSQTaskQueryKitSpecFactory implements QueryKitSpecFactory
 {
-  private DruidProcessingConfig processingConfig;
-
-  @Inject
-  public MSQTaskQueryKitSpecFactory(DruidProcessingConfig processingConfig)
-  {
-    this.processingConfig = processingConfig;
-  }
-
   @Override
   public QueryKitSpec makeQueryKitSpec(
       QueryKit<Query<?>> queryKit,
       String queryId,
       MSQTuningConfig tuningConfig,
-      QueryContext queryContext)
+      QueryContext queryContext
+  )
   {
-    return new QueryKitSpec(
-        queryKit,
-        queryId,
-        tuningConfig.getMaxNumWorkers(),
-        tuningConfig.getMaxNumWorkers(),
-
-        // Assume tasks are symmetric: workers have the same number of 
processors available as a controller.
-        // Create one partition per processor per task, for maximum 
parallelism.
-        MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
-            queryContext,
-            processingConfig.getNumThreads()
-        )
-    );
+    return new QueryKitSpec(queryKit, queryId);
   }
-
 }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index d4116fed863..999de322393 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -83,7 +83,6 @@ import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
 import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.QueryFrameworkUtils;
-import org.apache.druid.sql.calcite.util.TestTimelineServerView;
 import org.apache.druid.sql.calcite.view.NoopViewManager;
 import org.apache.druid.sql.hook.DruidHookDispatcher;
 import org.apache.druid.sql.http.EngineInfo;
@@ -271,7 +270,7 @@ public class DartSqlResourceTest extends MSQTestBase
                 )
             )
         ),
-        new DartQueryKitSpecFactory(new 
TestTimelineServerView(Collections.emptyList())),
+        new DartQueryKitSpecFactory(),
         injector.getInstance(MultiQueryKit.class),
         new ServerConfig(),
         new DefaultQueryConfig(ImmutableMap.of("foo", "bar")),
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
index 6291a9d7f35..28d40e08766 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
@@ -233,6 +233,7 @@ public class MSQTasksTest
         null, // WorkerFailureListener
         ImmutableMap.of(),
         TimeUnit.SECONDS.toMillis(5),
+        Limits.MAX_WORKERS,
         new MSQWorkerTaskLauncherConfig()
     );
 
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
index 5223dae9834..0ca643f109f 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
@@ -39,6 +39,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.metadata.LockFilterPolicy;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.exec.MSQTasks;
 import org.apache.druid.msq.exec.WorkerFailureListener;
 import org.apache.druid.msq.indexing.error.MSQFault;
@@ -90,6 +91,7 @@ public class MSQWorkerTaskLauncherRetryTest
         workerFailureListener,
         ImmutableMap.of(),
         TimeUnit.SECONDS.toMillis(5),
+        Limits.MAX_WORKERS,
         new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(),
         2
     );
@@ -134,6 +136,7 @@ public class MSQWorkerTaskLauncherRetryTest
         workerFailureListener,
         ImmutableMap.of(),
         TimeUnit.SECONDS.toMillis(5),
+        Limits.MAX_WORKERS,
         new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(),
         2
     );
@@ -180,6 +183,7 @@ public class MSQWorkerTaskLauncherRetryTest
         workerFailureListener,
         ImmutableMap.of(),
         TimeUnit.SECONDS.toMillis(5),
+        Limits.MAX_WORKERS,
         new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(),
         2
     );
@@ -210,6 +214,7 @@ public class MSQWorkerTaskLauncherRetryTest
         workerFailureListener,
         ImmutableMap.of(),
         TimeUnit.SECONDS.toMillis(5),
+        Limits.MAX_WORKERS,
         new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(),
         2
     );
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
index 2e5c4859904..7badce6168b 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.indexing;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.exec.WorkerFailureListener;
 import 
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
 import org.apache.druid.rpc.indexing.OverlordClient;
@@ -46,6 +47,7 @@ public class MSQWorkerTaskLauncherTest
         getWorkerFailureListener(),
         ImmutableMap.of(),
         TimeUnit.SECONDS.toMillis(5),
+        Limits.MAX_WORKERS,
         new MSQWorkerTaskLauncherConfig()
     );
   }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
index bd1be760d27..fcef8d627a1 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
@@ -183,7 +183,8 @@ public class MockQueryDefinitionBuilder
                   ),
                   0
               ),
-              MAX_NUM_PARTITIONS
+              MAX_NUM_PARTITIONS,
+              false
           );
           break;
 
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index e7dda29cc55..e4b2850a74b 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -445,6 +445,7 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
         workerFailureListener,
         IndexerControllerContext.makeTaskContext(querySpec, queryKernelConfig, 
ImmutableMap.of()),
         0,
+        querySpec.getTuningConfig().getMaxNumWorkers(),
         taskLauncherConfig
     );
   }
@@ -472,6 +473,18 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
     return new MSQTestWorkerClient(inMemoryWorkers, mapper);
   }
 
+  @Override
+  public int maxNonLeafWorkerCount()
+  {
+    return NUM_WORKERS;
+  }
+
+  @Override
+  public int targetPartitionsPerWorker()
+  {
+    return 1;
+  }
+
   @Override
   public ControllerContext newContext(QueryContext context)
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to