This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3210ec58c5ff7e3633afa7f596ed7d517ec8d0d9
Author: Riza Suminto <[email protected]>
AuthorDate: Mon Apr 28 12:24:30 2025 -0700

    IMPALA-14006: Bound max_instances in CreateInputCollocatedInstances
    
    IMPALA-11604 (part 2) changes how many instances to create in
    Scheduler::CreateInputCollocatedInstances. This works when the left
    child fragment of a parent fragment is distributed across nodes.
    However, if the left child fragment instance is limited to only 1
    node (the case of UNPARTITIONED fragment), the scheduler might
    over-parallelize the parent fragment by scheduling too many instances in
    a single node.
    
    This patch attempts to mitigate the issue in two ways. First, it adds
    bounding logic in PlanFragment.traverseEffectiveParallelism() to lower
    parallelism further if the left (probe) side of the child fragment is
    not well distributed across nodes.
    
    Second, it adds TQueryExecRequest.max_parallelism_per_node to relay
    information from Analyzer.getMaxParallelismPerNode() to the scheduler.
    With this information, the scheduler can do additional sanity checks to
    prevent Scheduler::CreateInputCollocatedInstances from
    over-parallelizing a fragment. Note that this sanity check can also cap
    MAX_FS_WRITERS option under a similar scenario.
    
    Added ScalingVerdict enum and TRACE log it to show the scaling decision
    steps.
    
    Testing:
    - Add planner test and e2e test that exercise the corner case under
      COMPUTE_PROCESSING_COST=1 option.
    - Manually comment the bounding logic in traverseEffectiveParallelism()
      and confirm that the scheduler's sanity check still enforces the
      bounding.
    
    Change-Id: I65223b820c9fd6e4267d57297b1466d4e56829b3
    Reviewed-on: http://gerrit.cloudera.org:8080/22840
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/scheduling/scheduler.cc                     |  17 ++-
 common/thrift/Query.thrift                         |   4 +
 .../org/apache/impala/planner/PlanFragment.java    | 100 +++++++++++------
 .../java/org/apache/impala/planner/Planner.java    |   1 +
 .../org/apache/impala/planner/PlannerTest.java     |   2 +-
 .../queries/PlannerTest/tpcds-processing-cost.test | 125 +++++++++++++++++++++
 .../tpcds/queries/unpartitioned-probe.test         |  30 +++++
 tests/query_test/test_tpcds_queries.py             |   6 +
 8 files changed, 250 insertions(+), 35 deletions(-)

diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 4373334ca..07c5edfc9 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -457,9 +457,10 @@ Status Scheduler::CheckEffectiveInstanceCount(
     return Status(Substitute(
         "$0 scheduled instance count ($1) is higher than maximum instances per 
node"
         " ($2), indicating a planner bug. Consider running the query with"
-        " COMPUTE_PROCESSING_COST=false.",
+        " COMPUTE_PROCESSING_COST=false. Scheduler see $3 hosts for this 
fragment"
+        " with at most $4 fragment instance assignment at one host.",
         fragment_state->fragment.display_name, largest_inst_per_host,
-        qc.MAX_FRAGMENT_INSTANCES_PER_NODE));
+        qc.MAX_FRAGMENT_INSTANCES_PER_NODE, num_host, largest_inst_per_host));
   }
 
   int planned_inst_per_host = ceil((float)effective_instance_count / num_host);
@@ -925,6 +926,18 @@ void Scheduler::CreateInputCollocatedInstances(
         input_fragment_state.instance_states) {
       all_hosts.insert({input_instance_state.host, 
input_instance_state.krpc_host});
     }
+    // Sanity check max_instances with information from Planner, if set.
+    if (state->request().__isset.max_parallelism_per_node) {
+      int max_global = all_hosts.size() * 
state->request().max_parallelism_per_node;
+      if (max_instances > max_global) {
+        LOG(WARNING) << Substitute(
+            "Fragment $0 lowered max_instance from $1 to $2 due to num_host=$3 
and "
+            "max_parallelism_per_node=$4",
+            fragment.display_name, max_instances, max_global, all_hosts.size(),
+            state->request().max_parallelism_per_node);
+        max_instances = max_global;
+      }
+    }
     // This implementation creates the desired number of instances while 
balancing them
     // across hosts and ensuring that instances on the same host get 
consecutive instance
     // indexes.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index bd2181348..c23a4e910 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -1118,5 +1118,9 @@ struct TQueryExecRequest {
   // The unbounded version of cores_required. Used by Frontend to do executor 
group-set
   // assignment for the query. Should either be unset or set with positive 
value.
   18: optional i32 cores_required_unbounded
+
+  // Propagated value from Analyzer.getMaxParallelismPerNode().
+  // Used by scheduler.cc as sanity check during scheduling.
+  19: optional i32 max_parallelism_per_node
 }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java 
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 4ca2c7f6c..d994edddb 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -196,7 +196,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     setFragmentInPlanTree(planRoot_);
     coordinatorOnly_ = coordinatorOnly;
 
-    // Coordinator-only fragments must be unpartitined as there is only one 
instance of
+    // Coordinator-only fragments must be unpartitioned as there is only one 
instance of
     // them.
     Preconditions.checkState(!coordinatorOnly ||
         dataPartition_.equals(DataPartition.UNPARTITIONED));
@@ -1101,10 +1101,10 @@ public class PlanFragment extends 
TreeNode<PlanFragment> {
 
     // step 1: Set initial parallelism to the maximum possible.
     //   Subsequent steps after this will not exceed maximum parallelism sets 
here.
-    boolean canTryLower = adjustToMaxParallelism(
+    ScalingVerdict verdict = adjustToMaxParallelism(
         minThreadPerNode, maxThreadPerNode, parentFragment, nodeStepCount, 
queryOptions);
 
-    if (canTryLower) {
+    if (verdict == ScalingVerdict.CAN_LOWER && getAdjustedInstanceCount() > 1) 
{
       // step 2: Try lower parallelism by comparing output ProcessingCost of 
the input
       //   child fragment against this fragment's segment costs.
       Preconditions.checkState(getChildCount() > 0);
@@ -1117,10 +1117,31 @@ public class PlanFragment extends 
TreeNode<PlanFragment> {
           nodeStepCount, minParallelism, maxParallelism);
       setAdjustedInstanceCount(effectiveParallelism);
       if (LOG.isTraceEnabled() && effectiveParallelism != maxParallelism) {
-        logCountAdjustmentTrace(maxParallelism, effectiveParallelism,
+        logCountAdjustmentTrace(maxParallelism, effectiveParallelism, verdict,
             "Lower parallelism based on load and produce-consume rate ratio.");
       }
     }
+    // If this is probe traversal from Planner.computeEffectiveParallelism()
+    // (parentFragment == null), check possibility of left-child node (probe) 
being
+    // underparallelized.
+    if (parentFragment == null && hasChild(0)
+        && verdict != ScalingVerdict.FIXED_BY_PLAN_NODE
+        && verdict != ScalingVerdict.FIXED_BY_PARTITIONED_JOIN_BUILD) {
+      // Cap max parallelism at left child max.
+      // This is to prevent Scheduler::CreateInputCollocatedInstances to 
overparallelize.
+      // It is safe to do if verdict is neither of FIXED_BY_PLAN_NODE nor
+      // FIXED_BY_PARTITIONED_JOIN_BUILD.
+      PlanFragment lc = getChild(0);
+      int lcNumNode = lc.getNumNodes();
+      int lcMaxParallelism = IntMath.saturatedMultiply(maxThreadPerNode, 
lcNumNode);
+      if (lcMaxParallelism < getAdjustedInstanceCount()) {
+        LOG.warn("Reducing instance count of {} from {} to {} to follow 
left-child node "
+                + "{} (num_nodes={}, num_instance={}). Scaling verdict was 
{}.",
+            getId(), getAdjustedInstanceCount(), lcMaxParallelism, lc.getId(),
+            lc.getNumNodes(), lc.getAdjustedInstanceCount(), verdict);
+        setAdjustedInstanceCount(lcMaxParallelism);
+      }
+    }
     validateProcessingCosts();
 
     // step 3: Compute the parallelism of join build fragment.
@@ -1192,6 +1213,15 @@ public class PlanFragment extends TreeNode<PlanFragment> 
{
     return maxParallelism;
   }
 
+  private enum ScalingVerdict {
+    CAN_LOWER,
+    FIXED_BY_PLAN_NODE,
+    FIXED_BY_PARTITIONED_JOIN_BUILD,
+    UNION_FRAGMENT_BOUNDED,
+    SCAN_FRAGMENT_BOUNDED,
+    MIN_GLOBAL_PARALLELISM
+  }
+
   /**
    * Adjust parallelism of this fragment to the maximum allowed.
    * This method initialize maxParallelism_ and adjustedInstanceCount_.
@@ -1204,33 +1234,38 @@ public class PlanFragment extends 
TreeNode<PlanFragment> {
    * @param parentFragment Parent fragment of this fragment.
    * @param nodeStepCount The step count used to increase this fragment's 
parallelism.
    *                      Usually equal to number of nodes or just 1.
-   * @return True if it is possible to lower this fragment's parallelism 
through
-   * ProcessingCost comparison. False if the parallelism should not be changed 
anymore.
+   * @return a ScalingVerdict. If CAN_LOWER, it is possible to lower this 
fragment's
+   * parallelism through ProcessingCost comparison.
    */
-  private boolean adjustToMaxParallelism(final int minThreadPerNode,
+  private ScalingVerdict adjustToMaxParallelism(final int minThreadPerNode,
       final int maxThreadPerNode, final @Nullable PlanFragment parentFragment,
       final int nodeStepCount, TQueryOptions queryOptions) {
     int maxThreadAllowed = IntMath.saturatedMultiply(maxThreadPerNode, 
getNumNodes());
-    boolean canTryLower = true;
+    ScalingVerdict verdict = ScalingVerdict.CAN_LOWER;
 
     // Compute selectedParallelism as the maximum allowed parallelism.
     int selectedParallelism = getNumInstances();
     if (isFixedParallelism_) {
       selectedParallelism = getAdjustedInstanceCount();
       maxParallelism_ = selectedParallelism;
-      canTryLower = false;
+      verdict = ScalingVerdict.FIXED_BY_PLAN_NODE;
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("{} instance count fixed to {}. verdict={}", getId(), 
maxParallelism_,
+            verdict);
+      }
     } else if (isPartitionedJoinBuildFragment()) {
       // This is a non-shared (PARTITIONED) join build fragment.
       // Parallelism of this fragment is equal to its parent parallelism.
       Preconditions.checkNotNull(parentFragment);
       final int parentParallelism = parentFragment.getAdjustedInstanceCount();
-      if (LOG.isTraceEnabled() && selectedParallelism != parentParallelism) {
-        logCountAdjustmentTrace(selectedParallelism, parentParallelism,
-            "Partitioned join build fragment follow parent's parallelism.");
-      }
       selectedParallelism = parentParallelism;
       maxParallelism_ = parentFragment.getMaxParallelism();
-      canTryLower = false; // no need to compute effective parallelism anymore.
+      // no need to compute effective parallelism anymore.
+      verdict = ScalingVerdict.FIXED_BY_PARTITIONED_JOIN_BUILD;
+      if (LOG.isTraceEnabled()) {
+        logCountAdjustmentTrace(selectedParallelism, parentParallelism, 
verdict,
+            "Partitioned join build fragment follow parent's parallelism.");
+      }
     } else {
       UnionNode unionNode = getUnionNode();
 
@@ -1242,17 +1277,17 @@ public class PlanFragment extends 
TreeNode<PlanFragment> {
         if (maxParallelism_ > maxThreadAllowed) {
           selectedParallelism = maxThreadAllowed;
           if (LOG.isTraceEnabled()) {
-            logCountAdjustmentTrace(
-                getNumInstances(), selectedParallelism, "Follow 
maxThreadPerNode.");
+            logCountAdjustmentTrace(getNumInstances(), selectedParallelism, 
verdict,
+                "Follow maxThreadPerNode.");
           }
         } else {
           selectedParallelism = maxParallelism_;
           if (LOG.isTraceEnabled()) {
-            logCountAdjustmentTrace(getNumInstances(), selectedParallelism,
+            logCountAdjustmentTrace(getNumInstances(), selectedParallelism, 
verdict,
                 "Follow minimum work per thread or max child count.");
           }
         }
-        canTryLower = false;
+        verdict = ScalingVerdict.UNION_FRAGMENT_BOUNDED;
       } else {
         // This is an interior fragment or fragment with single scan node.
         // We calculate maxParallelism_, minParallelism, and 
selectedParallelism across
@@ -1286,7 +1321,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
 
           // Prevent caller from lowering parallelism if fragment has ScanNode
           // because there is no child fragment to compare with.
-          canTryLower = false;
+          verdict = ScalingVerdict.SCAN_FRAGMENT_BOUNDED;
         }
 
         int minParallelism = Math.min(
@@ -1297,20 +1332,20 @@ public class PlanFragment extends 
TreeNode<PlanFragment> {
         if (boundedParallelism > maxThreadAllowed) {
           selectedParallelism = maxThreadAllowed;
           if (LOG.isTraceEnabled()) {
-            logCountAdjustmentTrace(
-                getNumInstances(), selectedParallelism, "Follow 
maxThreadPerNode.");
+            logCountAdjustmentTrace(getNumInstances(), selectedParallelism, 
verdict,
+                "Follow maxThreadPerNode.");
           }
         } else {
           if (boundedParallelism < minParallelism && minParallelism < 
maxScannerThreads) {
             boundedParallelism = minParallelism;
-            canTryLower = false;
+            verdict = ScalingVerdict.MIN_GLOBAL_PARALLELISM;
             if (LOG.isTraceEnabled()) {
-              logCountAdjustmentTrace(
-                  getNumInstances(), boundedParallelism, "Follow 
minThreadPerNode.");
+              logCountAdjustmentTrace(getNumInstances(), boundedParallelism, 
verdict,
+                  "Follow minThreadPerNode.");
             }
           } else if (LOG.isTraceEnabled()) {
-            logCountAdjustmentTrace(
-                getNumInstances(), boundedParallelism, "Follow minimum work 
per thread.");
+            logCountAdjustmentTrace(getNumInstances(), boundedParallelism, 
verdict,
+                "Follow minimum work per thread.");
           }
           selectedParallelism = boundedParallelism;
         }
@@ -1325,7 +1360,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
 
     // Initialize this fragment's parallelism to the selectedParallelism.
     setAdjustedInstanceCount(selectedParallelism);
-    return canTryLower && selectedParallelism > 1;
+    return verdict;
   }
 
   /**
@@ -1392,8 +1427,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
 
     int adjustedCount = getAdjustedInstanceCount();
     if (LOG.isTraceEnabled() && originalInstanceCount_ != adjustedCount) {
-      logCountAdjustmentTrace(
-          originalInstanceCount_, adjustedCount, "Finalize effective 
parallelism.");
+      LOG.trace("{} finalize instance count from {} to {}.", getId(),
+          originalInstanceCount_, adjustedCount);
     }
 
     for (PlanNode node : collectPlanNodes()) { node.numInstances_ = 
adjustedCount; }
@@ -1403,9 +1438,10 @@ public class PlanFragment extends TreeNode<PlanFragment> 
{
     }
   }
 
-  private void logCountAdjustmentTrace(int oldCount, int newCount, String 
reason) {
-    LOG.trace("{} adjust instance count from {} to {}. {}", getId(), oldCount, 
newCount,
-        reason);
+  private void logCountAdjustmentTrace(
+      int oldCount, int newCount, ScalingVerdict verdict, String reason) {
+    LOG.trace("{} adjust instance count from {} to {}. verdict={} reason={}", 
getId(),
+        oldCount, newCount, verdict, reason);
   }
 
   private static boolean isBlockingNode(PlanNode node) {
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 259f8d613..3b13a2eab 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -619,6 +619,7 @@ public class Planner {
     CoreCount unboundedCores = computeBlockingAwareCores(postOrderFragments, 
true);
     int coresRequiredUnbounded = Math.max(1, 
unboundedCores.totalWithoutCoordinator());
     request.setCores_required_unbounded(coresRequiredUnbounded);
+    
request.setMax_parallelism_per_node(rootAnalyzer.getMaxParallelismPerNode());
     LOG.info("CoreCountUnbounded=" + unboundedCores
         + ", coresRequiredUnbounded=" + coresRequiredUnbounded);
   }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 2193e8346..a4a54a44e 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1504,7 +1504,7 @@ public class PlannerTest extends PlannerTestBase {
     TQueryOptions options = tpcdsParquetQueryOptions();
     options.setCompute_processing_cost(true);
     options.setProcessing_cost_min_threads(2);
-    options.setMax_fragment_instances_per_node(16);
+    // MAX_FRAGMENT_INSTANCES_PER_NODE option is set at test file.
     runPlannerTestFile("tpcds-processing-cost", 
"tpcds_partitioned_parquet_snap", options,
         tpcdsParquetTestOptions());
   }
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
index 2f06573d3..494f399f7 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
@@ -1,5 +1,7 @@
 # Regression test for IMPALA-12510: select star on empty table
 select * from functional.emptytable;
+---- QUERYOPTIONS
+MAX_FRAGMENT_INSTANCES_PER_NODE=16
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=4.00MB Threads=2
 Per-Host Resource Estimates: Memory=10MB
@@ -47,6 +49,8 @@ group by rollup (
   ss_net_paid,
   ss_ext_sales_price)
 limit 100
+---- QUERYOPTIONS
+MAX_FRAGMENT_INSTANCES_PER_NODE=16
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=1.81GB Threads=11
 Per-Host Resource Estimates: Memory=2.38GB
@@ -140,6 +144,8 @@ max-parallelism=21 segment-costs=[89349287, 198453333]
    in pipelines: 00(GETNEXT)
 ====
 select * from income_band;
+---- QUERYOPTIONS
+MAX_FRAGMENT_INSTANCES_PER_NODE=16
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=4.02MB Threads=2
 Per-Host Resource Estimates: Memory=20MB
@@ -170,6 +176,8 @@ max-parallelism=1 segment-costs=[28]
 ====
 # Scan cost should be exactly the same as select star without limit.
 select * from income_band limit 1000000000;
+---- QUERYOPTIONS
+MAX_FRAGMENT_INSTANCES_PER_NODE=16
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=4.02MB Threads=2
 Per-Host Resource Estimates: Memory=20MB
@@ -209,6 +217,8 @@ select
 from functional.emptytable
 ) b
 where rk = 1;
+---- QUERYOPTIONS
+MAX_FRAGMENT_INSTANCES_PER_NODE=16
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=48.00MB Threads=4
 Per-Host Resource Estimates: Memory=48MB
@@ -278,3 +288,118 @@ max-parallelism=1 segment-costs=[0, 0]
    tuple-ids=0 row-size=16B cardinality=0 cost=0
    in pipelines: 00(GETNEXT)
 ====
+# IMPALA-14006: Regression test for UNPARTITIONED fragment in probe side.
+# F03 should schedule 2 instances because F01 is only 1 node and 1 instance.
+select
+  timestamp_col,
+  rank
+from
+  functional.alltypessmall alts
+  left outer join (
+    select id, dense_rank() over(order by id) as rank, int_col from 
functional.alltypes
+  ) rank_view on (rank_view.id = alts.id)
+  where rank < 10;
+---- QUERYOPTIONS
+MAX_FRAGMENT_INSTANCES_PER_NODE=2
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=15.98MB Threads=6
+Per-Host Resource Estimates: Memory=49MB
+F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[233] cpu-comparison-result=8 [max(5 (self) 
vs 8 (sum children))]
+PLAN-ROOT SINK
+|  output exprs: timestamp_col, dense_rank()
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=200
+|
+09:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=7N,6N,0 row-size=32B cardinality=100 cost=33
+|  in pipelines: 02(GETNEXT)
+|
+F03:PLAN FRAGMENT [HASH(alts.id)] hosts=3 instances=2 (adjusted from 6)
+Per-Instance Resources: mem-estimate=193.11KB mem-reservation=0B 
thread-reservation=1
+max-parallelism=3 segment-costs=[713]
+05:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
+|  hash-table-id=00
+|  hash predicates: id = alts.id
+|  fk/pk conjuncts: id = alts.id
+|  other predicates: dense_rank() < CAST(10 AS BIGINT)
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=7N,6N,0 row-size=32B cardinality=100 cost=205
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+|--F05:PLAN FRAGMENT [HASH(alts.id)] hosts=3 instances=2 (adjusted from 6)
+|  |  Per-Instance Resources: mem-estimate=1.95MB mem-reservation=1.94MB 
thread-reservation=1
+|  |  max-parallelism=3 segment-costs=[120]
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: alts.id
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0 cost=100
+|  |
+|  08:EXCHANGE [HASH(alts.id)]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=0 row-size=20B cardinality=100 cost=20
+|  |  in pipelines: 00(GETNEXT)
+|  |
+|  F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 4)
+|  Per-Instance Resources: mem-estimate=16.19MB mem-reservation=8.00KB 
thread-reservation=1
+|  max-parallelism=3 segment-costs=[409]
+|  00:SCAN HDFS [functional.alltypessmall alts, RANDOM]
+|     HDFS partitions=4/4 files=4 size=6.32KB
+|     stored statistics:
+|       table: rows=100 size=6.32KB
+|       partitions: 4/4 rows=100
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=25
+|     mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=0
+|     tuple-ids=0 row-size=20B cardinality=100 cost=229
+|     in pipelines: 00(GETNEXT)
+|
+07:EXCHANGE [HASH(id)]
+|  mem-estimate=17.11KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=7,6 row-size=12B cardinality=730 cost=148
+|  in pipelines: 02(GETNEXT)
+|
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Instance Resources: mem-estimate=4.19MB mem-reservation=4.00MB 
thread-reservation=1
+max-parallelism=1 segment-costs=[17820]
+04:SELECT
+|  predicates: dense_rank() < CAST(10 AS BIGINT)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  tuple-ids=7,6 row-size=12B cardinality=730 cost=7300
+|  in pipelines: 02(GETNEXT)
+|
+03:ANALYTIC
+|  functions: dense_rank()
+|  order by: id ASC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=7,6 row-size=12B cardinality=7.30K cost=7300
+|  in pipelines: 02(GETNEXT)
+|
+06:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: id ASC
+|  mem-estimate=33.50KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=7 row-size=4B cardinality=7.30K cost=1904
+|  in pipelines: 02(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 6)
+Per-Instance Resources: mem-estimate=22.00MB mem-reservation=6.03MB 
thread-reservation=1
+max-parallelism=3 segment-costs=[20162, 3059]
+02:SORT
+|  order by: id ASC
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=7 row-size=4B cardinality=7.30K cost=2819
+|  in pipelines: 02(GETNEXT), 01(OPEN)
+|
+01:SCAN HDFS [functional.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   stored statistics:
+     table: rows=7.30K size=478.45KB
+     partitions: 24/24 rows=7.30K
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=310
+   mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=0
+   tuple-ids=1 row-size=4B cardinality=7.30K cost=17343
+   in pipelines: 01(GETNEXT)
+====
diff --git a/testdata/workloads/tpcds/queries/unpartitioned-probe.test 
b/testdata/workloads/tpcds/queries/unpartitioned-probe.test
new file mode 100644
index 000000000..e4b414a0a
--- /dev/null
+++ b/testdata/workloads/tpcds/queries/unpartitioned-probe.test
@@ -0,0 +1,30 @@
+====
+---- QUERY: UNPARTITIONED_PROBE
+# Validate num nodes and num instances from ExecSummary.
+select
+  timestamp_col,
+  rank
+from
+  functional.alltypessmall alts
+  left outer join (
+    select id, dense_rank() over(order by id) as rank, int_col from 
functional.alltypes
+  ) rank_view on (rank_view.id = alts.id)
+  where rank < 10;
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+2009-01-01 00:00:00,1
+2009-01-01 00:01:00,2
+2009-01-01 00:02:00.100000000,3
+2009-01-01 00:03:00.300000000,4
+2009-01-01 00:04:00.600000000,5
+2009-01-01 00:05:00.100000000,6
+2009-01-01 00:06:00.150000000,7
+2009-01-01 00:07:00.210000000,8
+2009-01-01 00:08:00.280000000,9
+---- TYPES
+TIMESTAMP, BIGINT
+---- RUNTIME_PROFILE
+row_regex: F03:EXCHANGE SENDER .* 1 .* 2 .*
+row_regex: 05:HASH JOIN .* 1 .* 2 .*
+row_regex: F01:EXCHANGE SENDER .* 1 .* 1 .*
+row_regex: 04:SELECT .* 1 .* 1 .*
+====
diff --git a/tests/query_test/test_tpcds_queries.py 
b/tests/query_test/test_tpcds_queries.py
index 723d4f49f..3c997e32c 100644
--- a/tests/query_test/test_tpcds_queries.py
+++ b/tests/query_test/test_tpcds_queries.py
@@ -782,6 +782,12 @@ class TestTpcdsQueryWithProcessingCost(TestTpcdsQuery):
     new_vector.get_value('exec_option')['max_fragment_instances_per_node'] = 2
     self.run_test_case(self.get_workload() + '-q67a', new_vector)
 
+  def test_unpartitioned_probe(self, vector):
+    """Set max_fragment_instances_per_node to 2 to contrast against 
num_nodes."""
+    new_vector = deepcopy(vector)
+    new_vector.get_value('exec_option')['max_fragment_instances_per_node'] = 2
+    self.run_test_case('unpartitioned-probe', new_vector)
+
 
 @SkipIfBuildType.dev_build
 @SkipIfDockerizedCluster.insufficient_mem_limit

Reply via email to