This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit b2bc488402462371c13650bef8385c31792e5919 Author: Riza Suminto <[email protected]> AuthorDate: Tue Mar 28 15:32:23 2023 -0700 IMPALA-12029: Relax scan fragment parallelism on first planning In a setup with multiple executor group set, Frontend will try to match a query with the smallest executor group set that can fit the memory and cpu requirement of the compiled query. There are kinds of query where the compiled plan will fit to any executor group set but not necessarily deliver the best performance. An example for this is Impala's COMPUTE STATS query. It does full table scan and aggregate the stats, have fairly simple query plan shape, but can benefit from higher scan parallelism. This patch relaxes the scan fragment parallelism on first round of query planning. This allows scan fragment to increase its parallelism based on its ProcessingCost estimation. If the relaxed plan fit in an executor group set, we replan once again with that executor group set but with scan fragment parallelism returned back to MT_DOP. This one extra round of query planning adds couple millisecond overhead depending on the complexity of the query plan, but necessary since the backend scheduler still expect at most MT_DOP amount of scan fragment instances. We can remove the extra replanning in the future once we can fully manage scan node parallelism without MT_DOP. This patch also adds some improvement, including: - Tune computeScanProcessingCost() to guard against scheduling too many scan fragments by comparing with the actual scan range count that Planner knows. - Use NUM_SCANNER_THREADS as a hint to cap scan node cost during the first round of planning. - Multiply memory related counters by num executors to make it per group set rather than per node. - Fix bug in doCreateExecRequest() about selection of num executors for planning. Testing: - Pass test_executor_groups.py - Add test cases in test_min_processing_per_thread_small. - Raised impala.admission-control.max-query-mem-limit.root.small from 64MB to 70MB in llama-site-3-groups.xml so that the new grouping query can fit in root.small pool. Change-Id: I7a2276fbd344d00caa67103026661a3644b9a1f9 Reviewed-on: http://gerrit.cloudera.org:8080/19656 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Kurt Deschler <[email protected]> Reviewed-by: Wenzhe Zhou <[email protected]> --- common/thrift/ImpalaService.thrift | 6 ++ .../org/apache/impala/planner/HBaseScanNode.java | 4 +- .../org/apache/impala/planner/HdfsScanNode.java | 31 ++++-- .../org/apache/impala/planner/KuduScanNode.java | 4 +- .../org/apache/impala/planner/PlanFragment.java | 43 ++++++-- .../java/org/apache/impala/planner/PlanNode.java | 4 +- .../java/org/apache/impala/planner/Planner.java | 21 ++-- .../java/org/apache/impala/planner/ScanNode.java | 48 ++++++++- .../java/org/apache/impala/service/Frontend.java | 111 ++++++++++++++++++--- fe/src/test/resources/llama-site-3-groups.xml | 4 +- tests/custom_cluster/test_executor_groups.py | 45 ++++++++- 11 files changed, 265 insertions(+), 56 deletions(-) diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 208bb9926..b83708566 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -74,6 +74,12 @@ enum TImpalaQueryOptions { MAX_IO_BUFFERS = 7 // Removed // Number of scanner threads. + // EXPERIMENTAL: if COMPUTE_PROCESSING_COST=true, this query option will be used to + // cap scan node cost to: + // (num_executor * NUM_SCANNER_THREADS * min_processing_per_thread) + // if the original scan cost exceed that value during the first round of planning. + // NUM_SCANNER_THREADS will be ignored once MT_DOP is restored in the second round of + // planning. NUM_SCANNER_THREADS = 8 ALLOW_UNSUPPORTED_FORMATS = 9 // Removed diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java index 96460b14b..4db626000 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java @@ -656,7 +656,9 @@ public class HBaseScanNode extends ScanNode { @Override public void computeProcessingCost(TQueryOptions queryOptions) { - processingCost_ = computeScanProcessingCost(queryOptions); + Preconditions.checkNotNull(scanRangeSpecs_); + processingCost_ = + computeScanProcessingCost(queryOptions, scanRangeSpecs_.getConcrete_rangesSize()); } @Override diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 6ba0b3c31..626c7c41c 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1817,8 +1817,7 @@ public class HdfsScanNode extends ScanNode { numNodes_ = (cardinality == 0 || totalNodes == 0) ? 1 : totalNodes; numInstances_ = (cardinality == 0 || totalInstances == 0) ? 1 : totalInstances; if (LOG.isTraceEnabled()) { - LOG.trace("computeNumNodes totalRanges=" - + (scanRangeSpecs_.getConcrete_rangesSize() + generatedScanRangeCount_) + LOG.trace("computeNumNodes totalRanges=" + getEffectiveNumScanRanges() + " localRanges=" + numLocalRanges + " remoteRanges=" + numRemoteRanges + " localRangeCounts.size=" + localRangeCounts.size() + " totalLocalParallelism=" + totalLocalParallelism @@ -1991,11 +1990,11 @@ public class HdfsScanNode extends ScanNode { .append("\n"); if (numScanRangesNoDiskIds_ > 0) { output.append(detailPrefix) - .append(String.format("missing disk ids: " - + "partitions=%s/%s files=%s/%s scan ranges %s/%s\n", - numPartitionsNoDiskIds_, sumValues(numPartitionsPerFs_), - numFilesNoDiskIds_, sumValues(totalFilesPerFs_), numScanRangesNoDiskIds_, - scanRangeSpecs_.getConcrete_rangesSize() + generatedScanRangeCount_)); + .append(String.format("missing disk ids: " + + "partitions=%s/%s files=%s/%s scan ranges %s/%s\n", + numPartitionsNoDiskIds_, sumValues(numPartitionsPerFs_), + numFilesNoDiskIds_, sumValues(totalFilesPerFs_), numScanRangesNoDiskIds_, + getEffectiveNumScanRanges())); } // Groups the min max original conjuncts by tuple descriptor. output.append(getMinMaxOriginalConjunctsExplainString(detailPrefix, detailLevel)); @@ -2110,7 +2109,9 @@ public class HdfsScanNode extends ScanNode { @Override public void computeProcessingCost(TQueryOptions queryOptions) { - processingCost_ = computeScanProcessingCost(queryOptions); + Preconditions.checkNotNull(scanRangeSpecs_); + processingCost_ = + computeScanProcessingCost(queryOptions, getEffectiveNumScanRanges()); } @Override @@ -2118,8 +2119,7 @@ public class HdfsScanNode extends ScanNode { // Update 'useMtScanNode_' before any return cases. It's used in BE. useMtScanNode_ = queryOptions.mt_dop > 0; Preconditions.checkNotNull(scanRangeSpecs_, "Cost estimation requires scan ranges."); - long scanRangeSize = - scanRangeSpecs_.getConcrete_rangesSize() + generatedScanRangeCount_; + long scanRangeSize = getEffectiveNumScanRanges(); if (scanRangeSize == 0) { nodeResourceProfile_ = ResourceProfile.noReservation(0); return; @@ -2485,6 +2485,17 @@ public class HdfsScanNode extends ScanNode { + scanRangeSpecs_.getSplit_specsSize(); } + /** + * Return the number of scan ranges when considering MAX_SCAN_RANGE_LENGTH option. + * computeScanRangeLocations() must be called before calling this. + */ + public long getEffectiveNumScanRanges() { + Preconditions.checkNotNull(scanRangeSpecs_); + Preconditions.checkState( + generatedScanRangeCount_ >= scanRangeSpecs_.getSplit_specsSize()); + return scanRangeSpecs_.getConcrete_rangesSize() + generatedScanRangeCount_; + } + /** * Sort filters in runtimeFilters_: min/max first followed by bloom. */ diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 6ec5255cb..98a4520b6 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -397,7 +397,9 @@ public class KuduScanNode extends ScanNode { @Override public void computeProcessingCost(TQueryOptions queryOptions) { - processingCost_ = computeScanProcessingCost(queryOptions); + Preconditions.checkNotNull(scanRangeSpecs_); + processingCost_ = + computeScanProcessingCost(queryOptions, scanRangeSpecs_.getConcrete_rangesSize()); } @Override diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java index 1af37ebe9..6c27ed8d3 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java @@ -259,10 +259,14 @@ public class PlanFragment extends TreeNode<PlanFragment> { * * @param queryOptions A query options for this query. */ - public void computeCostingSegment(TQueryOptions queryOptions) { + public void computeCostingSegment( + TQueryOptions queryOptions, boolean limitScanParallelism) { for (PlanNode node : collectPlanNodes()) { + if (node instanceof ScanNode) { + ((ScanNode) node).setLimitScanParallelism(limitScanParallelism); + } node.computeProcessingCost(queryOptions); - node.computeRowConsumptionAndProductionToCost(); + node.computeRowConsumptionAndProductionToCost(limitScanParallelism); if (LOG.isTraceEnabled()) { LOG.trace("ProcessingCost Node " + node.getProcessingCost().debugString()); } @@ -873,7 +877,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { /** * Get maximum allowed parallelism based on minimum processing load per fragment. * <p>This is controlled by {@code min_processing_per_thread} flag. Only valid after - * {@link #computeCostingSegment(TQueryOptions)} has been called. + * {@link #computeCostingSegment(TQueryOptions, boolean)} has been called. * * @return maximum allowed parallelism based on minimum processing load per fragment. */ @@ -996,17 +1000,18 @@ public class PlanFragment extends TreeNode<PlanFragment> { * {@code max(PROCESSING_COST_MIN_THREADS, MT_DOP, * TExecutorGroupSet.num_cores_per_executor)}. * @param parentParallelism Number of instance of parent fragment. + * @param limitScanParallelism Whether scan nodes parallelism is fixed to MT_DOP or not. */ - protected void traverseEffectiveParallelism( - int minThreadPerNode, int maxThreadPerNode, int parentParallelism) { + protected void traverseEffectiveParallelism(int minThreadPerNode, int maxThreadPerNode, + int parentParallelism, boolean limitScanParallelism) { Preconditions.checkNotNull( rootSegment_, "ProcessingCost Fragment %s has not been computed!", getId()); int nodeStepCount = getNumInstances() % getNumNodes() == 0 ? getNumNodes() : 1; // step 1: Set initial parallelism to the maximum possible. // Subsequent steps after this will not exceed maximum parallelism sets here. - boolean canTryLower = adjustToMaxParallelism( - minThreadPerNode, maxThreadPerNode, parentParallelism, nodeStepCount); + boolean canTryLower = adjustToMaxParallelism(minThreadPerNode, maxThreadPerNode, + parentParallelism, nodeStepCount, limitScanParallelism); if (canTryLower) { // step 2: Try lower parallelism by comparing output ProcessingCost of the input @@ -1035,8 +1040,8 @@ public class PlanFragment extends TreeNode<PlanFragment> { // parallelism to match the child fragment parallelism. for (PlanFragment child : getChildren()) { if (child.getSink() instanceof JoinBuildSink) { - child.traverseEffectiveParallelism( - minThreadPerNode, maxThreadPerNode, getAdjustedInstanceCount()); + child.traverseEffectiveParallelism(minThreadPerNode, maxThreadPerNode, + getAdjustedInstanceCount(), limitScanParallelism); } } } @@ -1052,11 +1057,12 @@ public class PlanFragment extends TreeNode<PlanFragment> { * @param parentParallelism Parallelism of parent fragment. * @param nodeStepCount The step count used to increase this fragment's parallelism. * Usually equal to number of nodes or just 1. + * @param limitScanParallelism Whether scan nodes parallelism is fixed to MT_DOP or not. * @return True if it is possible to lower this fragment's parallelism through * ProcessingCost comparison. False if the parallelism should not be changed anymore. */ private boolean adjustToMaxParallelism(int minThreadPerNode, int maxThreadPerNode, - int parentParallelism, int nodeStepCount) { + int parentParallelism, int nodeStepCount, boolean limitScanParallelism) { boolean canTryLower = true; // Compute maximum allowed parallelism. int maxParallelism = getNumInstances(); @@ -1078,6 +1084,9 @@ public class PlanFragment extends TreeNode<PlanFragment> { // bounded by the maximum parallelism between its exchanging child. // For now, it wont get here since fragment with UnionNode has fixed parallelism // (equal to MT_DOP, and previouslyAdjusted == true). + // However, a fragment that has leaf node (EmptySetNode, ScanNode, or UnionNode) + // may still reach this code if this is the first planning round over selected + // executor group (see IMPALA-12029). maxParallelism = IntMath.saturatedMultiply(maxThreadPerNode, getNumNodes()); int minParallelism = IntMath.saturatedMultiply(minThreadPerNode, getNumNodes()); int costBasedMaxParallelism = Math.max(nodeStepCount, getCostBasedMaxParallelism()); @@ -1102,6 +1111,18 @@ public class PlanFragment extends TreeNode<PlanFragment> { getNumInstances(), maxParallelism, "Follow maxThreadPerNode."); } } + + if (!limitScanParallelism) { + // If this is the first round of executor group planning, then it is possible for + // scan/leaf node to have its parallelism NOT fixed to MT_DOP. In that case + // prevent the next step from lowering parallelism. + for (PlanNode node : collectPlanNodes()) { + if (node.isLeafNode()) { + canTryLower = false; + break; + } + } + } } // Initialize this fragment's parallelism to the maxParallelism. @@ -1139,7 +1160,7 @@ public class PlanFragment extends TreeNode<PlanFragment> { /** * Override parallelism of this fragment with adjusted parallelism from CPU costing * algorithm. - * <p>Only valid after {@link #traverseEffectiveParallelism(int, int, int)} + * <p>Only valid after {@link #traverseEffectiveParallelism(int, int, int, boolean)} * called. */ protected void setEffectiveNumInstance() { diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index 2bf5ea579..59c95d8a7 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -929,12 +929,12 @@ abstract public class PlanNode extends TreeNode<PlanNode> { /** * Set number of rows consumed and produced data fields in processing cost. */ - public void computeRowConsumptionAndProductionToCost() { + public void computeRowConsumptionAndProductionToCost(boolean limitScanParallelism) { Preconditions.checkState(processingCost_.isValid(), "Processing cost of PlanNode " + getDisplayLabel() + " is invalid!"); processingCost_.setNumRowToConsume(getInputCardinality()); processingCost_.setNumRowToProduce(getCardinality()); - if (isLeafNode() + if ((isLeafNode() && limitScanParallelism) && (!fragment_.hasAdjustedInstanceCount() || fragment_.getAdjustedInstanceCount() < getNumInstances())) { fragment_.setFixedInstanceCount(getNumInstances()); diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index cdded30c4..260d1e8cd 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -403,16 +403,17 @@ public class Planner { * Adjust effective parallelism of each plan fragment of query after considering * processing cost rate and blocking operator. * <p> - * Only valid after {@link PlanFragment#computeCostingSegment(TQueryOptions)} has - * been called for all plan fragments in the list. + * Only valid after {@link PlanFragment#computeCostingSegment(TQueryOptions, boolean)} + * has been called for all plan fragments in the list. */ - private static void computeEffectiveParallelism( - List<PlanFragment> postOrderFragments, int minThreadPerNode, int maxThreadPerNode) { + private static void computeEffectiveParallelism(List<PlanFragment> postOrderFragments, + int minThreadPerNode, int maxThreadPerNode, boolean limitScanParallelism) { for (PlanFragment fragment : postOrderFragments) { if (!(fragment.getSink() instanceof JoinBuildSink)) { // Only adjust parallelism of non-join build fragment. // Join build fragment will be adjusted later by fragment hosting the join node. - fragment.traverseEffectiveParallelism(minThreadPerNode, maxThreadPerNode, -1); + fragment.traverseEffectiveParallelism( + minThreadPerNode, maxThreadPerNode, -1, limitScanParallelism); } } @@ -425,7 +426,7 @@ public class Planner { * This method returns the effective CPU requirement of a query when considering * processing cost rate and blocking operator. * <p> - * Only valid after {@link #computeEffectiveParallelism(List, int, int)} has + * Only valid after {@link #computeEffectiveParallelism(List, int, int, boolean)} has * been called over the plan fragment list. */ private static CoreCount computeBlockingAwareCores( @@ -454,7 +455,8 @@ public class Planner { * fragment parallelism according to producer-consumer rate between them. */ public static void computeProcessingCost(List<PlanFragment> planRoots, - TQueryExecRequest request, PlannerContext planCtx, int numCoresPerExecutor) { + TQueryExecRequest request, PlannerContext planCtx, int numCoresPerExecutor, + boolean limitScanParallelism) { TQueryOptions queryOptions = planCtx.getRootAnalyzer().getQueryOptions(); if (!ProcessingCost.isComputeCost(queryOptions)) { @@ -473,7 +475,7 @@ public class Planner { PlanFragment rootFragment = planRoots.get(0); List<PlanFragment> postOrderFragments = rootFragment.getNodesPostOrder(); for (PlanFragment fragment : postOrderFragments) { - fragment.computeCostingSegment(queryOptions); + fragment.computeCostingSegment(queryOptions, limitScanParallelism); } if (LOG.isTraceEnabled()) { @@ -482,7 +484,8 @@ public class Planner { + " maxThreads=" + maxThreads); } - computeEffectiveParallelism(postOrderFragments, minThreads, maxThreads); + computeEffectiveParallelism( + postOrderFragments, minThreads, maxThreads, limitScanParallelism); CoreCount effectiveCores = computeBlockingAwareCores(postOrderFragments); request.setCores_required(effectiveCores.total()); diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index be12b293c..59ebb179c 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -36,6 +36,7 @@ import org.apache.impala.catalog.Type; import org.apache.impala.common.NotImplementedException; import org.apache.impala.common.PrintUtils; import org.apache.impala.common.RuntimeEnv; +import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TScanRangeSpec; @@ -45,6 +46,7 @@ import org.apache.impala.util.ExprUtil; import com.google.common.base.Joiner; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.math.LongMath; /** * Representation of the common elements of all scan nodes. @@ -82,6 +84,11 @@ abstract public class ScanNode extends PlanNode { // Refer to the comment of 'TableRef.tableNumRowsHint_' protected long tableNumRowsHint_ = -1; + // When in multiple executor group set setup, true value means scan instance count + // is limited by MT_DOP. Otherwise, scan parallelism is decided by ProcessingCost. + // Set by PlanFragment.computeCostingSegment(). + protected boolean limitScanParallelism_; + public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) { super(id, desc.getId().asList(), displayName); desc_ = desc; @@ -345,9 +352,40 @@ abstract public class ScanNode extends PlanNode { return maxScannerThreads; } - protected ProcessingCost computeScanProcessingCost(TQueryOptions queryOptions) { - return ProcessingCost.basicCost(getDisplayLabel(), getInputCardinality(), - ExprUtil.computeExprsTotalCost(conjuncts_), rowMaterializationCost()); + protected ProcessingCost computeScanProcessingCost( + TQueryOptions queryOptions, long effectiveScanRangeCount) { + ProcessingCost cardinalityBasedCost = + ProcessingCost.basicCost(getDisplayLabel(), getInputCardinality(), + ExprUtil.computeExprsTotalCost(conjuncts_), rowMaterializationCost()); + + long maxScanThreads = effectiveScanRangeCount; + if (!limitScanParallelism_ && queryOptions.getNum_scanner_threads() > 0) { + maxScanThreads = Math.min(maxScanThreads, + LongMath.saturatedMultiply( + getNumNodes(), queryOptions.getNum_scanner_threads())); + } + + if (getInputCardinality() == 0) { + Preconditions.checkState(cardinalityBasedCost.getTotalCost() == 0, + "Scan is empty but cost is non-zero."); + return cardinalityBasedCost; + } else if (maxScanThreads < cardinalityBasedCost.getTotalCost() + / BackendConfig.INSTANCE.getMinProcessingPerThread()) { + // Input cardinality is unknown or cost is too high compared to maxScanThreads + // Return synthetic ProcessingCost based on maxScanThreads. + long syntheticCardinality = + Math.max(1, Math.min(getInputCardinality(), maxScanThreads)); + long syntheticPerRowCost = LongMath.saturatedMultiply( + (long) Math.ceil((double) BackendConfig.INSTANCE.getMinProcessingPerThread() + / syntheticCardinality), + maxScanThreads); + + return ProcessingCost.basicCost( + getDisplayLabel(), syntheticCardinality, 0, syntheticPerRowCost); + } + + // None of the conditions above apply. + return cardinalityBasedCost; } /** @@ -375,4 +413,8 @@ abstract public class ScanNode extends PlanNode { public boolean hasStorageLayerConjuncts() { return false; } public ExprSubstitutionMap getOptimizedAggSmap() { return optimizedAggSmap_; } + + protected void setLimitScanParallelism(boolean isLimitScanParallelism) { + limitScanParallelism_ = isLimitScanParallelism; + } } diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 1952b6e33..48f089a22 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.math.IntMath; +import com.google.common.math.LongMath; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -249,8 +250,10 @@ public class Frontend { private static final String VERDICT = "Verdict"; private static final String MEMORY_MAX = "MemoryMax"; private static final String MEMORY_ASK = "MemoryAsk"; + private static final String MEMORY_ASK_UNBOUNDED = "MemoryAskUnbounded"; private static final String CPU_MAX = "CpuMax"; private static final String CPU_ASK = "CpuAsk"; + private static final String CPU_ASK_UNBOUNDED = "CpuAskUnbounded"; /** * Plan-time context that allows capturing various artifacts created @@ -313,6 +316,14 @@ public class Frontend { // The group set being applied in current compilation. protected TExecutorGroupSet group_set_ = null; + // If false, set leaf fragment parallelism to maxScanParallelism_ per executor node. + // Otherwise, leaf fragment parallelism is set to MT_DOP per executor node. + protected boolean limitScanParallelism_ = true; + + // Maximum leaf fragment parallelism per executor node. + // Only used if limitScanParallelism_ is false. + protected int maxLeafParallelism_ = 1; + public boolean disableAuthorization() { return disableAuthorization_; } public long getEstimatedMemoryPerHost() { return estimated_memory_per_host_; } @@ -321,6 +332,14 @@ public class Frontend { public int getCoresRequired() { return cores_required_; } public void setCoresRequired(int x) { cores_required_ = x; } + public boolean isLimitScanParallelism() { return limitScanParallelism_; } + public void setLimitScanParallelism(boolean isLimitScanParallelism) { + limitScanParallelism_ = isLimitScanParallelism; + } + + public int getMaxLeafParallelism() { return maxLeafParallelism_; } + public void setMaxLeafParallelism(int x) { maxLeafParallelism_ = x; } + // Capture the current state and initialize before iterative compilations begin. public void captureState() { disableAuthorization_ = false; @@ -1766,8 +1785,12 @@ public class Frontend { // Compute resource requirements of the final plans. TQueryExecRequest result = new TQueryExecRequest(); - Planner.computeProcessingCost(planRoots, result, planner.getPlannerCtx(), - planCtx.compilationState_.getGroupSet().getNum_cores_per_executor()); + boolean isLimitScanParallelism = planCtx.compilationState_.isLimitScanParallelism(); + int maxCores = isLimitScanParallelism ? + planCtx.compilationState_.getGroupSet().getNum_cores_per_executor() : + planCtx.compilationState_.getMaxLeafParallelism(); + Planner.computeProcessingCost( + planRoots, result, planner.getPlannerCtx(), maxCores, isLimitScanParallelism); Planner.computeResourceReqs(planRoots, queryCtx, result, planner.getPlannerCtx(), planner.getAnalysisResult().isQueryStmt()); @@ -1959,12 +1982,15 @@ public class Frontend { return type == TStmtType.EXPLAIN || type == TStmtType.QUERY || type == TStmtType.DML; } - private static int expectedTotalCores(TExecutorGroupSet execGroupSet) { - int numExecutors = execGroupSet.getCurr_num_executors() > 0 ? + private static int expectedNumExecutor(TExecutorGroupSet execGroupSet) { + return execGroupSet.getCurr_num_executors() > 0 ? execGroupSet.getCurr_num_executors() : execGroupSet.getExpected_num_executors(); + } + + private static int expectedTotalCores(TExecutorGroupSet execGroupSet) { return IntMath.saturatedMultiply( - numExecutors, execGroupSet.getNum_cores_per_executor()); + expectedNumExecutor(execGroupSet), execGroupSet.getNum_cores_per_executor()); } private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) @@ -2015,12 +2041,26 @@ public class Frontend { TExecutorGroupSet group_set = null; String reason = "Unknown"; int attempt = 0; - for (int i = 0; i < num_executor_group_sets; i++) { + planCtx.compilationState_.setLimitScanParallelism( + num_executor_group_sets <= 1 || !ProcessingCost.isComputeCost(queryOptions)); + int lastExecutorGroupTotalCores = + expectedTotalCores(executorGroupSetsToUse.get(num_executor_group_sets - 1)); + long memoryAskUnbounded = -1; + int cpuAskUnbounded = -1; + int i = 0; + while (i < num_executor_group_sets) { group_set = executorGroupSetsToUse.get(i); planCtx.compilationState_.setGroupSet(group_set); + if (i == num_executor_group_sets - 1) { + // This is the last executor group. Fix leaf nodes parallelism back to MT_DOP. + planCtx.compilationState_.setLimitScanParallelism(true); + planCtx.compilationState_.setMaxLeafParallelism( + group_set.getNum_cores_per_executor()); + } else if (!planCtx.compilationState_.isLimitScanParallelism()) { + planCtx.compilationState_.setMaxLeafParallelism( + lastExecutorGroupTotalCores / expectedNumExecutor(group_set)); + } LOG.info("Consider executor group set: " + group_set); - FrontendProfile.getCurrent().addToCounter( - EXECUTOR_GROUPS_CONSIDERED, TUnit.UNIT, 1); String retryMsg = ""; while (true) { @@ -2067,9 +2107,9 @@ public class Frontend { } TRuntimeProfileNode groupSetProfile = createTRuntimeProfileNode(profileName); addCounter(groupSetProfile, - new TCounter(MEMORY_MAX, TUnit.BYTES, group_set.getMax_mem_limit())); - addCounter(groupSetProfile, new TCounter(CPU_MAX, TUnit.UNIT, available_cores)); - FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile); + new TCounter(MEMORY_MAX, TUnit.BYTES, + LongMath.saturatedMultiply( + expectedNumExecutor(group_set), group_set.getMax_mem_limit()))); // Find out the per host memory estimated from two possible sources. long per_host_mem_estimate = -1; @@ -2086,8 +2126,10 @@ public class Frontend { Preconditions.checkState(per_host_mem_estimate >= 0); boolean memReqSatisfied = per_host_mem_estimate <= group_set.getMax_mem_limit(); - addCounter( - groupSetProfile, new TCounter(MEMORY_ASK, TUnit.BYTES, per_host_mem_estimate)); + addCounter(groupSetProfile, + new TCounter(MEMORY_ASK, TUnit.BYTES, + LongMath.saturatedMultiply( + expectedNumExecutor(group_set), per_host_mem_estimate))); boolean cpuReqSatisfied = true; int scaled_cores_requirement = -1; @@ -2097,10 +2139,25 @@ public class Frontend { Math.ceil( cores_requirement / BackendConfig.INSTANCE.getQueryCpuCountDivisor())); cpuReqSatisfied = scaled_cores_requirement <= available_cores; + + addCounter(groupSetProfile, new TCounter(CPU_MAX, TUnit.UNIT, available_cores)); addCounter( groupSetProfile, new TCounter(CPU_ASK, TUnit.UNIT, scaled_cores_requirement)); addCounter(groupSetProfile, new TCounter(EFFECTIVE_PARALLELISM, TUnit.UNIT, cores_requirement)); + + if (memoryAskUnbounded > 0) { + addCounter(groupSetProfile, + new TCounter(MEMORY_ASK_UNBOUNDED, TUnit.BYTES, + LongMath.saturatedMultiply( + expectedNumExecutor(group_set), memoryAskUnbounded))); + memoryAskUnbounded = -1; + } + if (cpuAskUnbounded > 0) { + addCounter(groupSetProfile, + new TCounter(CPU_ASK_UNBOUNDED, TUnit.UNIT, cpuAskUnbounded)); + cpuAskUnbounded = -1; + } } boolean matchFound = false; @@ -2114,6 +2171,20 @@ public class Frontend { addInfoString(groupSetProfile, VERDICT, reason); matchFound = true; } else if (memReqSatisfied && cpuReqSatisfied) { + if (!planCtx.compilationState_.isLimitScanParallelism()) { + // Fix leaf nodes parallelism back to MT_DOP and replan. + // TODO: Remove this extra replanning and related codes in the future once we + // can fully manage scan node parallelism without MT_DOP. + planCtx.compilationState_.setLimitScanParallelism(true); + memoryAskUnbounded = per_host_mem_estimate; + if (ProcessingCost.isComputeCost(queryOptions)) { + cpuAskUnbounded = scaled_cores_requirement; + } + planCtx.compilationState_.restoreState(); + LOG.info("Executor group " + group_set + " fit the unbounded scan plan. " + + "Replanning again with scan parallelism equals MT_DOP."); + continue; + } reason = "suitable group found (estimated per-host memory=" + PrintUtils.printBytes(per_host_mem_estimate) + ", estimated cpu cores required=" + cores_requirement @@ -2129,6 +2200,9 @@ public class Frontend { matchFound = true; } + // Append this exec group set profile node. + FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile); + if (matchFound) { // Set the group name prefix in both the returned query options and // the query context for non default group setup. @@ -2162,6 +2236,9 @@ public class Frontend { // Restore to the captured state. planCtx.compilationState_.restoreState(); + FrontendProfile.getCurrent().addToCounter( + EXECUTOR_GROUPS_CONSIDERED, TUnit.UNIT, 1); + i++; } if (group_set == null) { @@ -2172,6 +2249,10 @@ public class Frontend { "The query does not fit largest executor group sets. Reason: " + reason + "."); } + } else { + // This group_set is a match. + FrontendProfile.getCurrent().addToCounter( + EXECUTOR_GROUPS_CONSIDERED, TUnit.UNIT, 1); } LOG.info("Selected executor group: " + group_set + ", reason: " + reason); @@ -2245,11 +2326,11 @@ public class Frontend { Preconditions.checkNotNull(analysisResult.getStmt()); TExecRequest result = createBaseExecRequest(queryCtx, analysisResult); - // Transfer the current number of executors in executor group set from planCtx to + // Transfer the expected number of executors in executor group set to // analyzer's global state. The info is needed to compute the number of nodes to be // used during planner phase for scans (see HdfsScanNode.computeNumNodes()). analysisResult.getAnalyzer().setNumExecutorsForPlanning( - planCtx.compilationState_.getGroupSet().getCurr_num_executors()); + expectedNumExecutor(planCtx.compilationState_.getGroupSet())); try { TQueryOptions queryOptions = queryCtx.client_request.query_options; diff --git a/fe/src/test/resources/llama-site-3-groups.xml b/fe/src/test/resources/llama-site-3-groups.xml index 6fdd67697..874cc7a75 100644 --- a/fe/src/test/resources/llama-site-3-groups.xml +++ b/fe/src/test/resources/llama-site-3-groups.xml @@ -37,8 +37,8 @@ </property> <property> <name>impala.admission-control.max-query-mem-limit.root.small</name> - <!-- 64 MB --> - <value>67108864</value> + <!-- 70 MB --> + <value>73400320</value> </property> <property> <name>impala.admission-control.min-query-mem-limit.root.small</name> diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index 579ec39f1..b204ddbdb 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -37,6 +37,10 @@ TEST_QUERY = "select count(*) from functional.alltypes where month + random() < # A query to test CPU requirement. Estimated memory per host is 37MB. CPU_TEST_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50;" +# A query with full table scan characteristics. +GROUPING_TEST_QUERY = ("select ss_item_sk from tpcds_parquet.store_sales" + " group by (ss_item_sk) order by ss_item_sk limit 10") + # Default query option to use for testing CPU requirement. CPU_DOP_OPTIONS = {'MT_DOP': '2', 'COMPUTE_PROCESSING_COST': 'true'} @@ -755,7 +759,7 @@ class TestExecutorGroups(CustomClusterTestSuite): # max-query-cpu-core-per-node-limit and max-query-cpu-core-coordinator-limit # properties of the three sets: # tiny: [0, 64MB, 4, 4] - # small: [0, 64MB, 8, 8] + # small: [0, 70MB, 8, 8] # large: [64MB+1Byte, 8PB, 64, 64] llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-3-groups.xml") # Start with a regular admission config with multiple pools and no resource limits. @@ -808,7 +812,7 @@ class TestExecutorGroups(CustomClusterTestSuite): # max-query-cpu-core-per-node-limit and max-query-cpu-core-coordinator-limit # properties of the three sets: # tiny: [0, 64MB, 4, 4] - # small: [0, 64MB, 8, 8] + # small: [0, 70MB, 8, 8] # large: [64MB+1Byte, 8PB, 64, 64] llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-3-groups.xml") @@ -894,6 +898,13 @@ class TestExecutorGroups(CustomClusterTestSuite): "ExecutorGroupsConsidered: 1"], ["EffectiveParallelism:", "CpuAsk:"]) + # Unset REQUEST_POOL. + self.execute_query_expect_success(self.client, "SET REQUEST_POOL='';") + + # Test that GROUPING_TEST_QUERY will get assigned to the small group. + self._run_query_and_verify_profile(GROUPING_TEST_QUERY, CPU_DOP_OPTIONS, + ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2", + "Verdict: Match", "CpuAsk: 4", "CpuAskUnbounded: 1"]) self.client.close() @pytest.mark.execute_serially @@ -939,6 +950,36 @@ class TestExecutorGroups(CustomClusterTestSuite): "Reason: not enough cpu cores (require=234, max=192).") in str(result) self.client.close() + @pytest.mark.execute_serially + def test_min_processing_per_thread_small(self): + """Test processing cost with min_processing_per_thread smaller than default""" + coordinator_test_args = "-min_processing_per_thread=500000" + self._setup_three_exec_group_cluster(coordinator_test_args) + + # Test that GROUPING_TEST_QUERY will get assigned to the large group. + self._run_query_and_verify_profile(GROUPING_TEST_QUERY, CPU_DOP_OPTIONS, + ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", + "Verdict: Match", "CpuAsk: 6"], + ["CpuAskUnbounded:"]) + + # Test that high_scan_cost_query will get assigned to the large group. + high_scan_cost_query = ("SELECT ss_item_sk FROM tpcds_parquet.store_sales " + "WHERE ss_item_sk < 1000000 GROUP BY ss_item_sk LIMIT 10") + options = copy.deepcopy(CPU_DOP_OPTIONS) + self._run_query_and_verify_profile(high_scan_cost_query, options, + ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", + "Verdict: Match", "CpuAsk: 6"], + ["CpuAskUnbounded:"]) + + # Test that high_scan_cost_query will get assigned to the small group + # if NUM_SCANNER_THREADS is limited to 1. + options['NUM_SCANNER_THREADS'] = '1' + self._run_query_and_verify_profile(high_scan_cost_query, options, + ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2", + "Verdict: Match", "CpuAsk: 4", "CpuAskUnbounded: 4"]) + + self.client.close() + @pytest.mark.execute_serially def test_per_exec_group_set_metrics(self): """This test verifies that the metrics for each exec group set are updated
