This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch branch-4.4.0
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f97042384e0312cfd3943426e048581d9678891d
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Mar 26 18:51:52 2024 -0700

    IMPALA-12980: Translate CpuAsk into admission control slots
    
    Impala has a concept of "admission control slots" - the amount of
    parallelism that should be allowed on an Impala daemon. This defaults to
    the number of processors per executor and can be overridden with
    -–admission_control_slots flag.
    
    Admission control slot accounting is described in IMPALA-8998. It
    computes 'slots_to_use' for each backend based on the maximum number of
    instances of any fragment on that backend. This can lead to slot
    underestimation and query overadmission. For example, assume an executor
    node with 48 CPU cores and configured with -–admission_control_slots=48.
    It is assigned 4 non-blocking query fragments, each has 12 instances
    scheduled in this executor. IMPALA-8998 algorithm will request the max
    instance (12) slots rather than the sum of all non-blocking fragment
    instances (48). With the 36 remaining slots free, the executor can still
    admit another fragment from a different query but will potentially have
    CPU contention with the one that is currently running.
    
    When COMPUTE_PROCESSING_COST is enabled, Planner will generate a CpuAsk
    number that represents the cpu requirement of that query over a
    particular executor group set. This number is an estimation of the
    largest number of query fragment instances that can run in parallel
    without waiting, given by the blocking operator analysis. Therefore, the
    fragment trace that sums into that CpuAsk number can be translated into
    'slots_to_use' as well, which will be a closer resemblance of maximum
    parallel execution of fragment instances.
    
    This patch adds a new query option called SLOT_COUNT_STRATEGY to control
    which admission control slot accounting to use. There are two possible
    values:
    - LARGEST_FRAGMENT, which is the original algorithm from IMPALA-8998.
      This is still the default value for the SLOT_COUNT_STRATEGY option.
    - PLANNER_CPU_ASK, which will follow the fragment trace that contributes
      towards CpuAsk number. This strategy will schedule more or equal
      admission control slots than the LARGEST_FRAGMENT strategy.
    
    To do the PLANNER_CPU_ASK strategy, the Planner will mark fragments that
    contribute to CpuAsk as dominant fragments. It also passes
    max_slot_per_executor information that it knows about the executor group
    set to the scheduler.
    
    AvgAdmissionSlotsPerExecutor counter is added to describe what Planner
    thinks the average 'slots_to_use' per backend will be, which follows
    this formula:
    
      AvgAdmissionSlotsPerExecutor = ceil(CpuAsk / num_executors)
    
    Actual 'slots_to_use' in each backend may differ than
    AvgAdmissionSlotsPerExecutor, depending on what is scheduled on that
    backend. 'slots_to_use' will be shown as 'AdmissionSlots' counter under
    each executor profile node.
    
    Testing:
    - Update test_executors.py with AvgAdmissionSlotsPerExecutor assertion.
    - Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost.
    - Add EE test test_processing_cost.py.
    - Add FE test PlannerTest#testProcessingCostPlanAdmissionSlots.
    
    Change-Id: I338ca96555bfe8d07afce0320b3688a0861663f2
    Reviewed-on: http://gerrit.cloudera.org:8080/21257
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/scheduling/admission-controller-test.cc     |   3 +-
 be/src/scheduling/admission-controller.cc          |   4 +-
 be/src/scheduling/scheduler.cc                     |  53 +-
 be/src/service/query-options.cc                    |   7 +
 be/src/service/query-options.h                     |   3 +-
 common/thrift/ImpalaService.thrift                 |   5 +
 common/thrift/Planner.thrift                       |   5 +
 common/thrift/Query.thrift                         |  33 +-
 fe/src/main/java/org/apache/impala/common/Id.java  |   1 -
 .../java/org/apache/impala/planner/CoreCount.java  |  77 +-
 .../org/apache/impala/planner/CostingSegment.java  |  12 +-
 .../org/apache/impala/planner/PlanFragment.java    |   7 +
 .../java/org/apache/impala/planner/Planner.java    |  24 +-
 .../java/org/apache/impala/service/Frontend.java   |  23 +
 .../org/apache/impala/planner/PlannerTest.java     |  11 +
 .../org/apache/impala/planner/PlannerTestBase.java |   2 +
 .../processing-cost-plan-admission-slots.test      | 976 +++++++++++++++++++++
 .../QueryTest/processing-cost-admission-slots.test | 110 +++
 tests/custom_cluster/test_executor_groups.py       | 208 +++--
 tests/query_test/test_processing_cost.py           |  42 +
 tests/query_test/test_tpcds_queries.py             |   8 +-
 21 files changed, 1504 insertions(+), 110 deletions(-)

diff --git a/be/src/scheduling/admission-controller-test.cc 
b/be/src/scheduling/admission-controller-test.cc
index 9111a5367..babc848fe 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -637,7 +637,8 @@ TEST_F(AdmissionControllerTest, QueryRejection) {
   EXPECT_STR_CONTAINS(rejected_slots_reason,
       "number of admission control slots needed "
       "(16) on backend 'host1:25000' is greater than total slots available 4. 
Reduce "
-      "mt_dop to less than 4 to ensure that the query can execute.");
+      "MT_DOP or MAX_FRAGMENT_INSTANCES_PER_NODE to less than 4 to ensure that 
the "
+      "query can execute.");
   rejected_slots_reason = "";
   // Reduce mt_dop to ensure it can execute.
   SetHostsInScheduleState(
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index 6c614fc8e..af14a7010 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -212,8 +212,8 @@ const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION =
     "profile for more information about the per-node memory requirements.";
 const string REASON_NOT_ENOUGH_SLOTS_ON_BACKEND =
     "number of admission control slots needed ($0) on backend '$1' is greater 
than total "
-    "slots available $2. Reduce mt_dop to less than $2 to ensure that the 
query can "
-    "execute.";
+    "slots available $2. Reduce MT_DOP or MAX_FRAGMENT_INSTANCES_PER_NODE to 
less than "
+    "$2 to ensure that the query can execute.";
 const string REASON_MIN_RESERVATION_OVER_POOL_MEM =
     "minimum memory reservation needed is greater than pool max mem resources. 
Pool "
     "max mem resources: $0. Cluster-wide memory reservation needed: $1. 
Increase the "
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 1809dce55..11ed058b7 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -1165,23 +1165,64 @@ void Scheduler::ComputeBackendExecParams(
   }
 
   // Compute 'slots_to_use' for each backend based on the max # of instances of
-  // any fragment on that backend.
+  // any fragment on that backend. If 'compute_processing_cost' is on, and 
Planner
+  // set 'max_slot_per_executor', pick the min between 
'dominant_instance_count' and
+  // 'max_slot_per_executor'.
+  bool cap_slots = state->query_options().compute_processing_cost
+      && state->query_options().slot_count_strategy == 
TSlotCountStrategy::PLANNER_CPU_ASK
+      && state->request().__isset.max_slot_per_executor
+      && state->request().max_slot_per_executor > 0;
   for (auto& backend : state->per_backend_schedule_states()) {
-    int be_max_instances = 0;
+    int be_max_instances = 0; // max # of instances of any fragment.
+    int dominant_instance_count = 0; // sum of all dominant fragment instances.
+
     // Instances for a fragment are clustered together because of how the 
vector is
-    // constructed above. So we can compute the max # of instances of any 
fragment
-    // with a single pass over the vector.
+    // constructed above. For example, 3 fragments with 2 instances each will 
be in this
+    // order inside exec_params->instance_params() vector.
+    //   [F00_a, F00_b, F01_c, F01_d, F02_e, F02_f]
+    // So we can compute the max # of instances of any fragment with a single 
pass over
+    // the vector.
     int curr_fragment_idx = -1;
     int curr_instance_count = 0; // Number of instances of the current 
fragment seen.
+    bool is_dominant = false;
     for (auto& finstance : backend.second.exec_params->instance_params()) {
       if (curr_fragment_idx == -1 || curr_fragment_idx != 
finstance.fragment_idx()) {
+        // We arrived at new fragment group. Update 'be_max_instances'.
+        be_max_instances = max(be_max_instances, curr_instance_count);
+        // Reset 'curr_fragment_idx' and other counting related variables.
         curr_fragment_idx = finstance.fragment_idx();
         curr_instance_count = 0;
+        is_dominant =
+            
state->GetFragmentScheduleState(curr_fragment_idx)->fragment.is_dominant;
       }
       ++curr_instance_count;
-      be_max_instances = max(be_max_instances, curr_instance_count);
+      if (is_dominant) ++dominant_instance_count;
+    }
+    // Update 'be_max_instances' one last time.
+    be_max_instances = max(be_max_instances, curr_instance_count);
+
+    // Default slots to use number from IMPALA-8998.
+    // For fragment with largest num of instances running in this backend, it
+    // ensures allocation of 1 slot for each instance of that fragment.
+    int slots_to_use = be_max_instances;
+
+    // Done looping exec_params->instance_params(). Derived 'slots_to_use' 
based on
+    // finalized 'be_max_instances' and 'dominant_instance_count'.
+    if (cap_slots) {
+      if (dominant_instance_count >= be_max_instances) {
+        // One case where it is possible to have 'dominant_instance_count' <
+        // 'be_max_instances' is with dedicated coordinator setup. The 
schedule would
+        // only assign one coordinator fragment instance to the coordinator 
node,
+        // but 'dominant_instance_count' can be 0 if fragment.is_dominant == 
false.
+        // In that case, ignore 'dominant_instance_count' and continue with
+        // 'be_max_instances'.
+        // However, if 'dominant_instance_count' >= 'be_max_instances',
+        // continue with 'dominant_instance_count'.
+        slots_to_use = dominant_instance_count;
+      }
+      slots_to_use = min(slots_to_use, state->request().max_slot_per_executor);
     }
-    backend.second.exec_params->set_slots_to_use(be_max_instances);
+    backend.second.exec_params->set_slots_to_use(slots_to_use);
   }
 
   // This also ensures an entry always exists for the coordinator backend.
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index d04aab5dc..10b4f3fbc 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1278,6 +1278,13 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         query_options->__set_runtime_filter_ids_to_skip(filter_ids);
         break;
       }
+      case TImpalaQueryOptions::SLOT_COUNT_STRATEGY: {
+        TSlotCountStrategy::type enum_type;
+        RETURN_IF_ERROR(GetThriftEnum(value, "Slot count strategy",
+            _TSlotCountStrategy_VALUES_TO_NAMES, &enum_type));
+        query_options->__set_slot_count_strategy(enum_type);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << 
key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 0738a10b7..127f1ddb2 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -52,7 +52,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                       
          \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                       
          \
-      TImpalaQueryOptions::RUNTIME_FILTER_IDS_TO_SKIP + 1);                    
          \
+      TImpalaQueryOptions::SLOT_COUNT_STRATEGY + 1);                           
          \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)     
          \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)   
          \
@@ -330,6 +330,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
       ICEBERG_DISABLE_COUNT_STAR_OPTIMIZATION, TQueryOptionLevel::ADVANCED)    
          \
   QUERY_OPT_FN(runtime_filter_ids_to_skip,                                     
          \
       RUNTIME_FILTER_IDS_TO_SKIP, TQueryOptionLevel::DEVELOPMENT)              
          \
+  QUERY_OPT_FN(slot_count_strategy, SLOT_COUNT_STRATEGY, 
TQueryOptionLevel::ADVANCED)    \
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 61f5dbd0a..d1a5ebad0 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -930,6 +930,11 @@ enum TImpalaQueryOptions {
   //   RUNTIME_FILTER_IDS_TO_SKIP="1,2,3"
   // If using impala-shell client, double quote is not required.
   RUNTIME_FILTER_IDS_TO_SKIP = 176
+
+  // Decide what strategy to use to compute number of slot per node to run a 
query.
+  // Default to number of instances of largest query fragment 
(LARGEST_FRAGMENT).
+  // See TSlotCountStrategy in Query.thrift for documentation of its possible 
values.
+  SLOT_COUNT_STRATEGY = 177
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index 01249f8f2..36622f97f 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -94,6 +94,11 @@ struct TPlanFragment {
   // If true, the fragment must be scheduled on the coordinator. In this case 
'partition'
   // must be UNPARTITIONED.
   15: required bool is_coordinator_only
+
+  // Marker on whether this is a dominant fragment or not. Only possible to be 
true if
+  // COMPUTE_PROCESSING_COST=true. Otherwise, always false.
+  // See PlanFragment.java for definition of dominant fragment.
+  16: optional bool is_dominant = false
 }
 
 // location information for a single scan range
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 01ff6ebbc..e0ab68ddb 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -117,6 +117,24 @@ enum TCodeGenOptLevel {
   O3
 }
 
+// Option to decide how to compute slots_to_use for a query.
+// See Scheduler::ComputeBackendExecParams.
+enum TSlotCountStrategy {
+  // Compute slots to use for each backend based on the max number of 
instances of any
+  // fragment on that backend. This is the default and only strategy available 
if
+  // COMPUTE_PROCESSING_COST option is disabled. See IMPALA-8998.
+  LARGEST_FRAGMENT = 0,
+
+  // Compute slots to use for each backend based on CpuAsk counter from 
Planner.
+  // The CpuAsk is the largest sum of fragments instances subset that can run 
in-parallel
+  // without waiting for each other. This strategy relies on blocking operator 
analysis
+  // that is only available if COMPUTE_PROCESSING_COST option is enabled, and 
will
+  // schedule more or equal admission control slots than the LARGEST_FRAGMENT 
strategy.
+  // The scheduler will silently ignore this choice and fallback to 
LARGEST_FRAGMENT if
+  // COMPUTE_PROCESSING_COST is disabled.
+  PLANNER_CPU_ASK = 1
+}
+
 // constants for TQueryOptions.num_nodes
 const i32 NUM_NODES_ALL = 0
 const i32 NUM_NODES_ALL_RACKS = -1
@@ -705,6 +723,10 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   177: optional set<i32> runtime_filter_ids_to_skip
+
+  // See comment in ImpalaService.thrift
+  178: optional TSlotCountStrategy slot_count_strategy =
+    TSlotCountStrategy.LARGEST_FRAGMENT
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and 
external
@@ -997,10 +1019,9 @@ struct TQueryExecRequest {
   // Indicate whether the request is a trivial query. Used by admission 
control.
   13: optional bool is_trivial_query
 
-  // CPU core count required to run the query. Used by admission control to 
decide which
-  // executor group to run the query. Non-positive value means no specific CPU 
core count
-  // is required.
-  14: optional i32 cores_required;
+  // CPU core count required to run the query. Used by Frontend to decide which
+  // executor group to run the query. Should either unset or set with positive 
value.
+  14: optional i32 cores_required
 
   // Estimated per-host memory. The planner generates this value which may or 
may not be
   // overridden to come up with a final per-host memory estimate.
@@ -1008,5 +1029,9 @@ struct TQueryExecRequest {
 
   // Used for system tables that need to run on all nodes.
   16: optional bool include_all_coordinators
+
+  // Maximum admission control slot to use per executor backend.
+  // Only set if COMPUTE_PROCESSING_COST option is True.
+  17: optional i32 max_slot_per_executor
 }
 
diff --git a/fe/src/main/java/org/apache/impala/common/Id.java 
b/fe/src/main/java/org/apache/impala/common/Id.java
index a5ce52e20..6c15a76a5 100644
--- a/fe/src/main/java/org/apache/impala/common/Id.java
+++ b/fe/src/main/java/org/apache/impala/common/Id.java
@@ -19,7 +19,6 @@ package org.apache.impala.common;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
diff --git a/fe/src/main/java/org/apache/impala/planner/CoreCount.java 
b/fe/src/main/java/org/apache/impala/planner/CoreCount.java
index e8fc87101..721ee3b03 100644
--- a/fe/src/main/java/org/apache/impala/planner/CoreCount.java
+++ b/fe/src/main/java/org/apache/impala/planner/CoreCount.java
@@ -19,10 +19,10 @@ package org.apache.impala.planner;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.apache.impala.common.Id;
 
-import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -39,28 +39,59 @@ public class CoreCount {
   // List of CPU core count contributing to this CoreCount.
   private final ImmutableList<Integer> counts_;
 
+  // Set of unique fragment that contributes toward this CoreCount.
+  private final ImmutableSet<PlanFragmentId> uniqueFragmentIds_;
+
+  // True if this CoreCount include a plan root sink.
+  private final boolean hasPlanRootSink_;
+
   // Sum of all elements in count_.
-  // Cached after the first call of total().
-  private int total_ = -1;
+  private final int total_;
+
+  public CoreCount(PlanFragment fragment, int count) {
+    Preconditions.checkArgument(count >= 0, "Core count must be a non-negative 
number");
+    ids_ = ImmutableList.of(fragment.getId());
+    counts_ = ImmutableList.of(count);
+    uniqueFragmentIds_ = ImmutableSet.of(fragment.getId());
+    hasPlanRootSink_ = (fragment.getSink() instanceof PlanRootSink);
+    total_ = counts_.stream().mapToInt(v -> v).sum();
+  }
 
-  public CoreCount(Id id, int count) {
+  public CoreCount(PlanNode node, int count) {
     Preconditions.checkArgument(count >= 0, "Core count must be a non-negative 
number");
-    ids_ = ImmutableList.of(id);
+    ids_ = ImmutableList.of(node.getId());
     counts_ = ImmutableList.of(count);
+    PlanFragment fragment = node.getFragment();
+    uniqueFragmentIds_ = ImmutableSet.of(fragment.getId());
+    hasPlanRootSink_ = false;
+    total_ = counts_.stream().mapToInt(v -> v).sum();
   }
 
-  private CoreCount(ImmutableList<Id> ids, ImmutableList<Integer> counts) {
+  private CoreCount(ImmutableList<Id> ids, ImmutableList<Integer> counts,
+      ImmutableSet<PlanFragmentId> uniqueFragments, boolean hasPlanRootSink) {
     Preconditions.checkArgument(
         ids.size() == counts.size(), "ids and counts must have same size!");
     ids_ = ids;
     counts_ = counts;
+    uniqueFragmentIds_ = uniqueFragments;
+    hasPlanRootSink_ = hasPlanRootSink;
+    total_ = counts_.stream().mapToInt(v -> v).sum();
   }
 
-  public int total() {
-    if (total_ < 0) {
-      total_ = counts_.stream().mapToInt(v -> v).sum();
-    }
-    return total_;
+  public int total() { return total_; }
+  public boolean hasCoordinator() { return hasPlanRootSink_; }
+
+  /**
+   * If this CoreCount has coordinator fragment in it, return total() - 1.
+   * Otherwise, return the same value as total().
+   */
+  public int totalWithoutCoordinator() { return total_ - (hasPlanRootSink_ ? 1 
: 0); }
+
+  /**
+   * Return a set of PlanFragmentId that contribute toward this CoreCount.
+   */
+  public ImmutableSet<PlanFragmentId> getUniqueFragmentIds() {
+    return uniqueFragmentIds_;
   }
 
   @Override
@@ -85,26 +116,28 @@ public class CoreCount {
   protected static CoreCount sum(List<CoreCount> cores) {
     ImmutableList.Builder<Id> idBuilder = new ImmutableList.Builder<Id>();
     ImmutableList.Builder<Integer> countBuilder = new 
ImmutableList.Builder<Integer>();
+    ImmutableSet.Builder<PlanFragmentId> fragmentIdBuilder =
+        new ImmutableSet.Builder<PlanFragmentId>();
+    boolean hasPlanRootSink = false;
     for (CoreCount coreRequirement : cores) {
       idBuilder.addAll(coreRequirement.ids_);
       countBuilder.addAll(coreRequirement.counts_);
+      fragmentIdBuilder.addAll(coreRequirement.uniqueFragmentIds_);
+      hasPlanRootSink |= coreRequirement.hasPlanRootSink_;
     }
-    return new CoreCount(idBuilder.build(), countBuilder.build());
+    return new CoreCount(idBuilder.build(), countBuilder.build(),
+        fragmentIdBuilder.build(), hasPlanRootSink);
   }
 
   protected static CoreCount sum(CoreCount core1, CoreCount core2) {
-    ImmutableList.Builder<Id> idBuilder = new ImmutableList.Builder<Id>();
-    ImmutableList.Builder<Integer> countBuilder = new 
ImmutableList.Builder<Integer>();
-
-    idBuilder.addAll(core1.ids_);
-    idBuilder.addAll(core2.ids_);
-    countBuilder.addAll(core1.counts_);
-    countBuilder.addAll(core2.counts_);
-
-    return new CoreCount(idBuilder.build(), countBuilder.build());
+    return sum(ImmutableList.of(core1, core2));
   }
 
   protected static CoreCount max(CoreCount core1, CoreCount core2) {
-    return (core1.total() < core2.total()) ? core2 : core1;
+    if (core1.totalWithoutCoordinator() < core2.totalWithoutCoordinator()) {
+      return core2;
+    } else {
+      return core1;
+    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java 
b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
index fed37bf29..49c87cfb6 100644
--- a/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
+++ b/fe/src/main/java/org/apache/impala/planner/CostingSegment.java
@@ -77,6 +77,16 @@ public class CostingSegment extends TreeNode<CostingSegment> 
{
     }
   }
 
+  private CoreCount createCoreCount() {
+    if (isOutputSegment()) {
+      return new CoreCount(sink_.getFragment(), 
cost_.getNumInstancesExpected());
+    } else {
+      Preconditions.checkState(!nodes_.isEmpty());
+      PlanNode topNode = nodes_.get(nodes_.size() - 1);
+      return new CoreCount(topNode, cost_.getNumInstancesExpected());
+    }
+  }
+
   private void appendCost(ProcessingCost additionalCost) {
     Preconditions.checkArgument(additionalCost.isValid());
     ProcessingCost newTotalCost = ProcessingCost.sumCost(additionalCost, 
cost_);
@@ -106,7 +116,7 @@ public class CostingSegment extends 
TreeNode<CostingSegment> {
   protected CoreCount traverseBlockingAwareCores(
       Map<PlanFragmentId, Pair<CoreCount, List<CoreCount>>> fragmentCoreState,
       ImmutableList.Builder<CoreCount> subtreeCoreBuilder) {
-    CoreCount segmentCore = new CoreCount(getRootId(), 
cost_.getNumInstancesExpected());
+    CoreCount segmentCore = createCoreCount();
     // If not in input segment, gather cost of children first.
     for (CostingSegment childSegment : getChildren()) {
       CoreCount childSegmentCores =
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java 
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index e59df00cd..07ad46d1a 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -173,6 +173,11 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   private int thisTreeCpuCore_ = -1;
   private int subtreeCpuCore_ = -1;
 
+  // Determine whether this fragment is the dominant one in the plan tree 
based on
+  // calculation initiated by Planner.computeBlockingAwareCores().
+  // A fragment is dominant if it contribute towards the final CoreCount.
+  private boolean isDominantFragment_ = false;
+
   public long getProducedRuntimeFiltersMemReservationBytes() {
     return producedRuntimeFiltersMemReservationBytes_;
   }
@@ -636,6 +641,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     
result.setThread_reservation(perInstanceResourceProfile_.getThreadReservation());
     result.setEffective_instance_count(getAdjustedInstanceCount());
     result.setIs_coordinator_only(coordinatorOnly_);
+    result.setIs_dominant(isDominantFragment_);
     return result;
   }
 
@@ -826,6 +832,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     planRoot_ = root;
     setFragmentInPlanTree(planRoot_);
   }
+  protected void markDominant() { isDominantFragment_ = true; }
 
   /**
    * Set the destination node of this fragment's sink, i.e. an ExchangeNode or 
a JoinNode.
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 0bd71a40a..e2e45397b 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -19,8 +19,10 @@ package org.apache.impala.planner;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Stack;
 import java.util.stream.Collectors;
 
@@ -54,7 +56,6 @@ import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryExecRequest;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterMode;
-import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.KuduUtil;
@@ -566,10 +567,25 @@ public class Planner {
 
     computeEffectiveParallelism(postOrderFragments,
         rootAnalyzer.getMinParallelismPerNode(), 
rootAnalyzer.getMaxParallelismPerNode());
-    CoreCount effectiveCores = computeBlockingAwareCores(postOrderFragments);
-    request.setCores_required(effectiveCores.total());
 
-    LOG.info("CoreCount=" + effectiveCores);
+    // Count bounded core count. This is taken from final instance count from 
previous
+    // step.
+    CoreCount boundedCores = computeBlockingAwareCores(postOrderFragments);
+    Set<PlanFragmentId> dominantFragmentIds =
+        new HashSet<>(boundedCores.getUniqueFragmentIds());
+    int coresRequired = Math.max(1, boundedCores.totalWithoutCoordinator());
+    if (boundedCores.hasCoordinator()) {
+      // exclude coordinator fragment from dominantFragmentIds.
+      dominantFragmentIds.remove(rootFragment.getId());
+    }
+    request.setCores_required(coresRequired);
+    LOG.info("CoreCount=" + boundedCores + ", coresRequired=" + coresRequired);
+
+    // Mark dominant fragment. This will be used by scheduler in scheduler.cc 
to count
+    // admission slot requirement.
+    for (PlanFragment fragment : postOrderFragments) {
+      if (dominantFragmentIds.contains(fragment.getId())) 
fragment.markDominant();
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 7bb3d0efe..815ef6337 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -204,6 +204,7 @@ import org.apache.impala.thrift.TResultSetMetadata;
 import org.apache.impala.thrift.TRuntimeProfileNode;
 import org.apache.impala.thrift.TShowFilesParams;
 import org.apache.impala.thrift.TShowStatsOp;
+import org.apache.impala.thrift.TSlotCountStrategy;
 import org.apache.impala.thrift.TStmtType;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TTruncateParams;
@@ -266,6 +267,8 @@ public class Frontend {
   private static final String MEMORY_ASK = "MemoryAsk";
   private static final String CPU_MAX = "CpuMax";
   private static final String CPU_ASK = "CpuAsk";
+  private static final String AVG_ADMISSION_SLOTS_PER_EXECUTOR =
+      "AvgAdmissionSlotsPerExecutor";
 
   /**
    * Plan-time context that allows capturing various artifacts created
@@ -2329,6 +2332,17 @@ public class Frontend {
 
       if (matchFound) {
         setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, 
group_set);
+        if (isComputeCost && req.query_exec_request != null
+            && queryOptions.slot_count_strategy == 
TSlotCountStrategy.PLANNER_CPU_ASK) {
+          // Use 'cores_requirement' instead of 'scaled_cores_requirement' 
since the
+          // former is derived from the real number of fragment instances.
+          int avgSlotsUsePerBackend =
+              getAvgSlotsUsePerBackend(req, cores_requirement, group_set);
+          FrontendProfile.getCurrent().setToCounter(
+              AVG_ADMISSION_SLOTS_PER_EXECUTOR, TUnit.UNIT, 
avgSlotsUsePerBackend);
+          req.query_exec_request.setMax_slot_per_executor(
+              group_set.getNum_cores_per_executor());
+        }
         break;
       }
 
@@ -2379,6 +2393,15 @@ public class Frontend {
     return req;
   }
 
+  private static int getAvgSlotsUsePerBackend(
+      TExecRequest req, int cores_requirement, TExecutorGroupSet group_set) {
+    int numExecutors = expectedNumExecutor(group_set);
+    Preconditions.checkState(cores_requirement > 0);
+    Preconditions.checkState(numExecutors > 0);
+    int idealSlot = (int) Math.ceil((double) cores_requirement / numExecutors);
+    return Math.max(1, Math.min(idealSlot, 
group_set.getNum_cores_per_executor()));
+  }
+
   private static void setGroupNamePrefix(boolean default_executor_group,
       boolean clientSetRequestPool, TExecRequest req, TExecutorGroupSet 
group_set) {
     // Set the group name prefix in both the returned query options and
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 1553be896..e3a09c236 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1447,6 +1447,17 @@ public class PlannerTest extends PlannerTestBase {
         tpcdsParquetTestOptions());
   }
 
+  /**
+   * Test that shows query plan for the same test cases at EE test
+   * test_processing_cost.py::TestProcessingCost::test_admission_slots.
+   */
+  @Test
+  public void testProcessingCostPlanAdmissionSlots() {
+    TQueryOptions options = tpcdsParquetQueryOptions();
+    runPlannerTestFile("processing-cost-plan-admission-slots",
+        "tpcds_partitioned_parquet_snap", options, tpcdsParquetTestOptions());
+  }
+
   /**
    * Test SELECTIVITY hints
    */
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index eee094dea..bf0750f66 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -70,6 +70,7 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TReplicaPreference;
 import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TScanRangeSpec;
+import org.apache.impala.thrift.TSlotCountStrategy;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTupleDescriptor;
@@ -426,6 +427,7 @@ public class PlannerTestBase extends FrontendTestBase {
         .setCompute_processing_cost(true)
         .setMax_fragment_instances_per_node(12)
         .setReplica_preference(TReplicaPreference.REMOTE)
+        .setSlot_count_strategy(TSlotCountStrategy.PLANNER_CPU_ASK)
         .setPlanner_testcase_mode(true);
   }
 
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/processing-cost-plan-admission-slots.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/processing-cost-plan-admission-slots.test
new file mode 100644
index 000000000..7d30603f8
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/processing-cost-plan-admission-slots.test
@@ -0,0 +1,976 @@
+# This FE tests should match test cases in
+# functional-query/queries/QueryTest/processing-cost-admission-slots.test
+# Any modification here should be applied there as well.
+#
+# QUERY: TPCDS-Q1-CPC-PLANNER-CPU-ASK
+# Expect a total of 16 admission slots given to this query if using 
PLANNER_CPU_ASK strategy.
+with customer_total_return as (
+  select sr_customer_sk as ctr_customer_sk,
+    sr_store_sk as ctr_store_sk,
+    sum(SR_RETURN_AMT) as ctr_total_return
+  from tpcds_partitioned_parquet_snap.store_returns,
+    tpcds_partitioned_parquet_snap.date_dim
+  where sr_returned_date_sk = d_date_sk
+  and d_year = 2000
+  group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+  tpcds_partitioned_parquet_snap.store,
+  tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+  select avg(ctr_total_return) * 1.2
+  from customer_total_return ctr2
+  where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- QUERYOPTIONS
+COMPUTE_PROCESSING_COST=True
+SLOT_COUNT_STRATEGY=PLANNER_CPU_ASK
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=42.58MB Threads=16
+Per-Host Resource Estimates: Memory=189MB
+F10:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[104] cpu-comparison-result=16 [max(1 
(self) vs 16 (sum children))]
+PLAN-ROOT SINK
+|  output exprs: c_customer_id
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=100
+|
+27:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: c_customer_id ASC
+|  limit: 100
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=13 row-size=28B cardinality=100 cost=4
+|  in pipelines: 14(GETNEXT)
+|
+F04:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3 (adjusted from 
384)
+Per-Instance Resources: mem-estimate=504.82KB mem-reservation=0B 
thread-reservation=1
+max-parallelism=3 segment-costs=[215624, 4] cpu-comparison-result=16 [max(3 
(self) vs 16 (sum children))]
+14:TOP-N [LIMIT=100]
+|  order by: c_customer_id ASC
+|  mem-estimate=2.73KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=13 row-size=28B cardinality=100 cost=100
+|  in pipelines: 14(GETNEXT), 17(OPEN)
+|
+13:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  hash-table-id=00
+|  hash predicates: sr_store_sk = ctr2.ctr_store_sk
+|  other join predicates: sum(SR_RETURN_AMT) > avg(ctr_total_return) * 
CAST(1.2 AS DECIMAL(2,1))
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=2,5,4 row-size=74B cardinality=53.52K cost=107030
+|  in pipelines: 17(GETNEXT), 25(OPEN)
+|
+|--F11:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+|  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  max-parallelism=3 segment-costs=[9] cpu-comparison-result=4 [max(3 
(self) vs 4 (sum children))]
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: ctr2.ctr_store_sk
+|  |  runtime filters: RF000[bloom] <- ctr2.ctr_store_sk, RF001[min_max] <- 
ctr2.ctr_store_sk
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0 cost=6
+|  |
+|  26:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=11 row-size=20B cardinality=6 cost=3
+|  |  in pipelines: 25(GETNEXT)
+|  |
+|  F09:PLAN FRAGMENT [HASH(ctr2.ctr_store_sk)] hosts=3 instances=3 (adjusted 
from 384)
+|  Per-Instance Resources: mem-estimate=10.09MB mem-reservation=1.94MB 
thread-reservation=1
+|  max-parallelism=3 segment-costs=[13, 1] cpu-comparison-result=4 [max(3 
(self) vs 4 (sum children))]
+|  25:AGGREGATE [FINALIZE]
+|  |  output: avg:merge(ctr_total_return)
+|  |  group by: ctr2.ctr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=11 row-size=20B cardinality=6 cost=12
+|  |  in pipelines: 25(GETNEXT), 23(OPEN)
+|  |
+|  24:EXCHANGE [HASH(ctr2.ctr_store_sk)]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=10 row-size=20B cardinality=6 cost=1
+|  |  in pipelines: 23(GETNEXT)
+|  |
+|  F08:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3 
(adjusted from 384)
+|  Per-Instance Resources: mem-estimate=20.49MB mem-reservation=3.94MB 
thread-reservation=1
+|  max-parallelism=3 segment-costs=[162009, 107030, 1] cpu-comparison-result=4 
[max(3 (self) vs 4 (sum children))]
+|  10:AGGREGATE [STREAMING]
+|  |  output: avg(sum(SR_RETURN_AMT))
+|  |  group by: sr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=10 row-size=20B cardinality=6 cost=107030
+|  |  in pipelines: 23(GETNEXT)
+|  |
+|  23:AGGREGATE [FINALIZE]
+|  |  output: sum:merge(SR_RETURN_AMT)
+|  |  group by: sr_customer_sk, sr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=8 row-size=24B cardinality=53.52K cost=160545
+|  |  in pipelines: 23(GETNEXT), 06(OPEN)
+|  |
+|  22:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+|  |  mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=8 row-size=24B cardinality=53.52K cost=1464
+|  |  in pipelines: 06(GETNEXT)
+|  |
+|  F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 384)
+|  Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB 
thread-reservation=0 runtime-filters-memory=1.00MB
+|  Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB 
thread-reservation=1
+|  max-parallelism=3 segment-costs=[18864896, 1464] cpu-comparison-result=4 
[max(3 (self) vs 4 (sum children))]
+|  09:AGGREGATE [STREAMING]
+|  |  output: sum(SR_RETURN_AMT)
+|  |  group by: sr_customer_sk, sr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=8 row-size=24B cardinality=53.52K cost=160545
+|  |  in pipelines: 06(GETNEXT)
+|  |
+|  08:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash-table-id=01
+|  |  hash predicates: sr_returned_date_sk = d_date_sk
+|  |  fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+|  |  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=6,7 row-size=24B cardinality=53.52K cost=53515
+|  |  in pipelines: 06(GETNEXT), 07(OPEN)
+|  |
+|  |--F12:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  |  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  |  max-parallelism=3 segment-costs=[388]
+|  |  JOIN BUILD
+|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |  build expressions: d_date_sk
+|  |  |  runtime filters: RF008[bloom] <- d_date_sk
+|  |  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0 cost=373
+|  |  |
+|  |  21:EXCHANGE [BROADCAST]
+|  |  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  |  tuple-ids=7 row-size=8B cardinality=373 cost=15
+|  |  |  in pipelines: 07(GETNEXT)
+|  |  |
+|  |  F07:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  |  Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB 
thread-reservation=1
+|  |  max-parallelism=1 segment-costs=[123625]
+|  |  07:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+|  |     HDFS partitions=1/1 files=1 size=2.17MB
+|  |     predicates: d_year = CAST(2000 AS INT)
+|  |     stored statistics:
+|  |       table: rows=73.05K size=2.17MB
+|  |       columns: all
+|  |     extrapolated-rows=disabled max-scan-range-rows=73.05K
+|  |     parquet statistics predicates: d_year = CAST(2000 AS INT)
+|  |     parquet dictionary predicates: d_year = CAST(2000 AS INT)
+|  |     mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+|  |     tuple-ids=7 row-size=8B cardinality=373 cost=123620
+|  |     in pipelines: 07(GETNEXT)
+|  |
+|  06:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+|     HDFS partitions=2004/2004 files=2004 size=33.63MB
+|     runtime filters: RF008[bloom] -> sr_returned_date_sk
+|     stored statistics:
+|       table: rows=287.51K size=33.63MB
+|       partitions: 2004/2004 rows=287.51K
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=10.01K 
est-scan-range=373(filtered from 2004)
+|     mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+|     tuple-ids=6 row-size=16B cardinality=53.52K(filtered from 287.51K) 
cost=18650836
+|     in pipelines: 06(GETNEXT)
+|
+12:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=02
+|  hash predicates: sr_store_sk = s_store_sk
+|  fk/pk conjuncts: sr_store_sk = s_store_sk
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=2,5,4 row-size=74B cardinality=53.52K cost=53515
+|  in pipelines: 17(GETNEXT), 04(OPEN)
+|
+|--F13:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+|  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  max-parallelism=3 segment-costs=[15]
+|  JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: s_store_sk
+|  |  runtime filters: RF002[bloom] <- s_store_sk, RF003[min_max] <- s_store_sk
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0 cost=12
+|  |
+|  20:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=18B cardinality=12 cost=3
+|  |  in pipelines: 04(GETNEXT)
+|  |
+|  F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB 
thread-reservation=0 runtime-filters-memory=1.00MB
+|  Per-Instance Resources: mem-estimate=16.09MB mem-reservation=16.00KB 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[50014]
+|  04:SCAN HDFS [tpcds_partitioned_parquet_snap.store, RANDOM]
+|     HDFS partitions=1/1 files=1 size=9.81KB
+|     predicates: s_state = 'TN'
+|     runtime filters: RF001[min_max] -> 
tpcds_partitioned_parquet_snap.store.s_store_sk, RF000[bloom] -> 
tpcds_partitioned_parquet_snap.store.s_store_sk
+|     stored statistics:
+|       table: rows=12 size=9.81KB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=12
+|     parquet statistics predicates: s_state = 'TN'
+|     parquet dictionary predicates: s_state = 'TN'
+|     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+|     tuple-ids=4 row-size=18B cardinality=12 cost=50013
+|     in pipelines: 04(GETNEXT)
+|
+11:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash-table-id=03
+|  hash predicates: sr_customer_sk = c_customer_sk
+|  fk/pk conjuncts: none
+|  mem-estimate=0B mem-reservation=0B spill-buffer=256.00KB 
thread-reservation=0
+|  tuple-ids=2,5 row-size=56B cardinality=53.52K cost=53515
+|  in pipelines: 17(GETNEXT), 05(OPEN)
+|
+|--F14:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3 (adjusted from 
384)
+|  |  Per-Instance Resources: mem-estimate=8.84MB mem-reservation=5.75MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  max-parallelism=3 segment-costs=[103516]
+|  JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: c_customer_sk
+|  |  runtime filters: RF004[bloom] <- c_customer_sk, RF005[min_max] <- 
c_customer_sk
+|  |  mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB 
thread-reservation=0 cost=100000
+|  |
+|  19:EXCHANGE [HASH(c_customer_sk)]
+|  |  mem-estimate=3.09MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=5 row-size=32B cardinality=100.00K cost=3516
+|  |  in pipelines: 05(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=16.42MB mem-reservation=2.00MB 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[56641]
+|  05:SCAN HDFS [tpcds_partitioned_parquet_snap.customer, RANDOM]
+|     HDFS partitions=1/1 files=1 size=5.49MB
+|     stored statistics:
+|       table: rows=100.00K size=5.49MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=100.00K
+|     mem-estimate=16.00MB mem-reservation=2.00MB thread-reservation=0
+|     tuple-ids=5 row-size=32B cardinality=100.00K cost=53125
+|     in pipelines: 05(GETNEXT)
+|
+18:EXCHANGE [HASH(sr_customer_sk)]
+|  mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K cost=1464
+|  in pipelines: 17(GETNEXT)
+|
+F02:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3 
(adjusted from 384)
+Per-Instance Resources: mem-estimate=10.49MB mem-reservation=1.94MB 
thread-reservation=1
+max-parallelism=3 segment-costs=[162009, 1464] cpu-comparison-result=4 [max(3 
(self) vs 4 (sum children))]
+17:AGGREGATE [FINALIZE]
+|  output: sum:merge(SR_RETURN_AMT)
+|  group by: sr_customer_sk, sr_store_sk
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K cost=160545
+|  in pipelines: 17(GETNEXT), 00(OPEN)
+|
+16:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+|  mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K cost=1464
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 384)
+Per-Host Shared Resources: mem-estimate=4.00MB mem-reservation=4.00MB 
thread-reservation=0 runtime-filters-memory=4.00MB
+Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB 
thread-reservation=1
+max-parallelism=3 segment-costs=[18864896, 1464] cpu-comparison-result=4 
[max(3 (self) vs 4 (sum children))]
+03:AGGREGATE [STREAMING]
+|  output: sum(SR_RETURN_AMT)
+|  group by: sr_customer_sk, sr_store_sk
+|  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB 
thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K cost=160545
+|  in pipelines: 00(GETNEXT)
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=04
+|  hash predicates: sr_returned_date_sk = d_date_sk
+|  fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=24B cardinality=53.52K cost=53515
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--F15:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  max-parallelism=3 segment-costs=[388]
+|  JOIN BUILD
+|  |  join-table-id=04 plan-id=05 cohort-id=01
+|  |  build expressions: d_date_sk
+|  |  runtime filters: RF006[bloom] <- d_date_sk
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0 cost=373
+|  |
+|  15:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=1 row-size=8B cardinality=373 cost=15
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[123625]
+|  01:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+|     HDFS partitions=1/1 files=1 size=2.17MB
+|     predicates: d_year = CAST(2000 AS INT)
+|     stored statistics:
+|       table: rows=73.05K size=2.17MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=73.05K
+|     parquet statistics predicates: d_year = CAST(2000 AS INT)
+|     parquet dictionary predicates: d_year = CAST(2000 AS INT)
+|     mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+|     tuple-ids=1 row-size=8B cardinality=373 cost=123620
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+   HDFS partitions=2004/2004 files=2004 size=33.63MB
+   runtime filters: RF001[min_max] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF003[min_max] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF005[min_max] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF000[bloom] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF002[bloom] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF004[bloom] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF006[bloom] ->  
[...]
+   stored statistics:
+     table: rows=287.51K size=33.63MB
+     partitions: 2004/2004 rows=287.51K
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=10.01K 
est-scan-range=373(filtered from 2004)
+   mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+   tuple-ids=0 row-size=16B cardinality=53.52K(filtered from 287.51K) 
cost=18650836
+   in pipelines: 00(GETNEXT)
+====
+# QUERY: TPCDS-Q1-CPC-LARGEST-FRAGMENT
+# Expect a total of 3 admission slots given to this query if using 
LARGEST_FRAGMENT strategy.
+with customer_total_return as (
+  select sr_customer_sk as ctr_customer_sk,
+    sr_store_sk as ctr_store_sk,
+    sum(SR_RETURN_AMT) as ctr_total_return
+  from tpcds_partitioned_parquet_snap.store_returns,
+    tpcds_partitioned_parquet_snap.date_dim
+  where sr_returned_date_sk = d_date_sk
+  and d_year = 2000
+  group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+  tpcds_partitioned_parquet_snap.store,
+  tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+  select avg(ctr_total_return) * 1.2
+  from customer_total_return ctr2
+  where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- QUERYOPTIONS
+COMPUTE_PROCESSING_COST=True
+SLOT_COUNT_STRATEGY=LARGEST_FRAGMENT
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=42.58MB Threads=16
+Per-Host Resource Estimates: Memory=189MB
+F10:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[104] cpu-comparison-result=16 [max(1 
(self) vs 16 (sum children))]
+PLAN-ROOT SINK
+|  output exprs: c_customer_id
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0 cost=100
+|
+27:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: c_customer_id ASC
+|  limit: 100
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=13 row-size=28B cardinality=100 cost=4
+|  in pipelines: 14(GETNEXT)
+|
+F04:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3 (adjusted from 
384)
+Per-Instance Resources: mem-estimate=504.82KB mem-reservation=0B 
thread-reservation=1
+max-parallelism=3 segment-costs=[215624, 4] cpu-comparison-result=16 [max(3 
(self) vs 16 (sum children))]
+14:TOP-N [LIMIT=100]
+|  order by: c_customer_id ASC
+|  mem-estimate=2.73KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=13 row-size=28B cardinality=100 cost=100
+|  in pipelines: 14(GETNEXT), 17(OPEN)
+|
+13:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  hash-table-id=00
+|  hash predicates: sr_store_sk = ctr2.ctr_store_sk
+|  other join predicates: sum(SR_RETURN_AMT) > avg(ctr_total_return) * 
CAST(1.2 AS DECIMAL(2,1))
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=2,5,4 row-size=74B cardinality=53.52K cost=107030
+|  in pipelines: 17(GETNEXT), 25(OPEN)
+|
+|--F11:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+|  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  max-parallelism=3 segment-costs=[9] cpu-comparison-result=4 [max(3 
(self) vs 4 (sum children))]
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: ctr2.ctr_store_sk
+|  |  runtime filters: RF000[bloom] <- ctr2.ctr_store_sk, RF001[min_max] <- 
ctr2.ctr_store_sk
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0 cost=6
+|  |
+|  26:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=11 row-size=20B cardinality=6 cost=3
+|  |  in pipelines: 25(GETNEXT)
+|  |
+|  F09:PLAN FRAGMENT [HASH(ctr2.ctr_store_sk)] hosts=3 instances=3 (adjusted 
from 384)
+|  Per-Instance Resources: mem-estimate=10.09MB mem-reservation=1.94MB 
thread-reservation=1
+|  max-parallelism=3 segment-costs=[13, 1] cpu-comparison-result=4 [max(3 
(self) vs 4 (sum children))]
+|  25:AGGREGATE [FINALIZE]
+|  |  output: avg:merge(ctr_total_return)
+|  |  group by: ctr2.ctr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=11 row-size=20B cardinality=6 cost=12
+|  |  in pipelines: 25(GETNEXT), 23(OPEN)
+|  |
+|  24:EXCHANGE [HASH(ctr2.ctr_store_sk)]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=10 row-size=20B cardinality=6 cost=1
+|  |  in pipelines: 23(GETNEXT)
+|  |
+|  F08:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3 
(adjusted from 384)
+|  Per-Instance Resources: mem-estimate=20.49MB mem-reservation=3.94MB 
thread-reservation=1
+|  max-parallelism=3 segment-costs=[162009, 107030, 1] cpu-comparison-result=4 
[max(3 (self) vs 4 (sum children))]
+|  10:AGGREGATE [STREAMING]
+|  |  output: avg(sum(SR_RETURN_AMT))
+|  |  group by: sr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=10 row-size=20B cardinality=6 cost=107030
+|  |  in pipelines: 23(GETNEXT)
+|  |
+|  23:AGGREGATE [FINALIZE]
+|  |  output: sum:merge(SR_RETURN_AMT)
+|  |  group by: sr_customer_sk, sr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=8 row-size=24B cardinality=53.52K cost=160545
+|  |  in pipelines: 23(GETNEXT), 06(OPEN)
+|  |
+|  22:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+|  |  mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=8 row-size=24B cardinality=53.52K cost=1464
+|  |  in pipelines: 06(GETNEXT)
+|  |
+|  F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 384)
+|  Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB 
thread-reservation=0 runtime-filters-memory=1.00MB
+|  Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB 
thread-reservation=1
+|  max-parallelism=3 segment-costs=[18864896, 1464] cpu-comparison-result=4 
[max(3 (self) vs 4 (sum children))]
+|  09:AGGREGATE [STREAMING]
+|  |  output: sum(SR_RETURN_AMT)
+|  |  group by: sr_customer_sk, sr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=8 row-size=24B cardinality=53.52K cost=160545
+|  |  in pipelines: 06(GETNEXT)
+|  |
+|  08:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash-table-id=01
+|  |  hash predicates: sr_returned_date_sk = d_date_sk
+|  |  fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+|  |  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=6,7 row-size=24B cardinality=53.52K cost=53515
+|  |  in pipelines: 06(GETNEXT), 07(OPEN)
+|  |
+|  |--F12:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  |  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  |  max-parallelism=3 segment-costs=[388]
+|  |  JOIN BUILD
+|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |  build expressions: d_date_sk
+|  |  |  runtime filters: RF008[bloom] <- d_date_sk
+|  |  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0 cost=373
+|  |  |
+|  |  21:EXCHANGE [BROADCAST]
+|  |  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  |  tuple-ids=7 row-size=8B cardinality=373 cost=15
+|  |  |  in pipelines: 07(GETNEXT)
+|  |  |
+|  |  F07:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  |  Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB 
thread-reservation=1
+|  |  max-parallelism=1 segment-costs=[123625]
+|  |  07:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+|  |     HDFS partitions=1/1 files=1 size=2.17MB
+|  |     predicates: d_year = CAST(2000 AS INT)
+|  |     stored statistics:
+|  |       table: rows=73.05K size=2.17MB
+|  |       columns: all
+|  |     extrapolated-rows=disabled max-scan-range-rows=73.05K
+|  |     parquet statistics predicates: d_year = CAST(2000 AS INT)
+|  |     parquet dictionary predicates: d_year = CAST(2000 AS INT)
+|  |     mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+|  |     tuple-ids=7 row-size=8B cardinality=373 cost=123620
+|  |     in pipelines: 07(GETNEXT)
+|  |
+|  06:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+|     HDFS partitions=2004/2004 files=2004 size=33.63MB
+|     runtime filters: RF008[bloom] -> sr_returned_date_sk
+|     stored statistics:
+|       table: rows=287.51K size=33.63MB
+|       partitions: 2004/2004 rows=287.51K
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=10.01K 
est-scan-range=373(filtered from 2004)
+|     mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+|     tuple-ids=6 row-size=16B cardinality=53.52K(filtered from 287.51K) 
cost=18650836
+|     in pipelines: 06(GETNEXT)
+|
+12:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=02
+|  hash predicates: sr_store_sk = s_store_sk
+|  fk/pk conjuncts: sr_store_sk = s_store_sk
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=2,5,4 row-size=74B cardinality=53.52K cost=53515
+|  in pipelines: 17(GETNEXT), 04(OPEN)
+|
+|--F13:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+|  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  max-parallelism=3 segment-costs=[15]
+|  JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: s_store_sk
+|  |  runtime filters: RF002[bloom] <- s_store_sk, RF003[min_max] <- s_store_sk
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0 cost=12
+|  |
+|  20:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=18B cardinality=12 cost=3
+|  |  in pipelines: 04(GETNEXT)
+|  |
+|  F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB 
thread-reservation=0 runtime-filters-memory=1.00MB
+|  Per-Instance Resources: mem-estimate=16.09MB mem-reservation=16.00KB 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[50014]
+|  04:SCAN HDFS [tpcds_partitioned_parquet_snap.store, RANDOM]
+|     HDFS partitions=1/1 files=1 size=9.81KB
+|     predicates: s_state = 'TN'
+|     runtime filters: RF001[min_max] -> 
tpcds_partitioned_parquet_snap.store.s_store_sk, RF000[bloom] -> 
tpcds_partitioned_parquet_snap.store.s_store_sk
+|     stored statistics:
+|       table: rows=12 size=9.81KB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=12
+|     parquet statistics predicates: s_state = 'TN'
+|     parquet dictionary predicates: s_state = 'TN'
+|     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+|     tuple-ids=4 row-size=18B cardinality=12 cost=50013
+|     in pipelines: 04(GETNEXT)
+|
+11:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash-table-id=03
+|  hash predicates: sr_customer_sk = c_customer_sk
+|  fk/pk conjuncts: none
+|  mem-estimate=0B mem-reservation=0B spill-buffer=256.00KB 
thread-reservation=0
+|  tuple-ids=2,5 row-size=56B cardinality=53.52K cost=53515
+|  in pipelines: 17(GETNEXT), 05(OPEN)
+|
+|--F14:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3 (adjusted from 
384)
+|  |  Per-Instance Resources: mem-estimate=8.84MB mem-reservation=5.75MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  max-parallelism=3 segment-costs=[103516]
+|  JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: c_customer_sk
+|  |  runtime filters: RF004[bloom] <- c_customer_sk, RF005[min_max] <- 
c_customer_sk
+|  |  mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB 
thread-reservation=0 cost=100000
+|  |
+|  19:EXCHANGE [HASH(c_customer_sk)]
+|  |  mem-estimate=3.09MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=5 row-size=32B cardinality=100.00K cost=3516
+|  |  in pipelines: 05(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=16.42MB mem-reservation=2.00MB 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[56641]
+|  05:SCAN HDFS [tpcds_partitioned_parquet_snap.customer, RANDOM]
+|     HDFS partitions=1/1 files=1 size=5.49MB
+|     stored statistics:
+|       table: rows=100.00K size=5.49MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=100.00K
+|     mem-estimate=16.00MB mem-reservation=2.00MB thread-reservation=0
+|     tuple-ids=5 row-size=32B cardinality=100.00K cost=53125
+|     in pipelines: 05(GETNEXT)
+|
+18:EXCHANGE [HASH(sr_customer_sk)]
+|  mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K cost=1464
+|  in pipelines: 17(GETNEXT)
+|
+F02:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3 
(adjusted from 384)
+Per-Instance Resources: mem-estimate=10.49MB mem-reservation=1.94MB 
thread-reservation=1
+max-parallelism=3 segment-costs=[162009, 1464] cpu-comparison-result=4 [max(3 
(self) vs 4 (sum children))]
+17:AGGREGATE [FINALIZE]
+|  output: sum:merge(SR_RETURN_AMT)
+|  group by: sr_customer_sk, sr_store_sk
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K cost=160545
+|  in pipelines: 17(GETNEXT), 00(OPEN)
+|
+16:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+|  mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K cost=1464
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 384)
+Per-Host Shared Resources: mem-estimate=4.00MB mem-reservation=4.00MB 
thread-reservation=0 runtime-filters-memory=4.00MB
+Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB 
thread-reservation=1
+max-parallelism=3 segment-costs=[18864896, 1464] cpu-comparison-result=4 
[max(3 (self) vs 4 (sum children))]
+03:AGGREGATE [STREAMING]
+|  output: sum(SR_RETURN_AMT)
+|  group by: sr_customer_sk, sr_store_sk
+|  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB 
thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K cost=160545
+|  in pipelines: 00(GETNEXT)
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=04
+|  hash predicates: sr_returned_date_sk = d_date_sk
+|  fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=24B cardinality=53.52K cost=53515
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--F15:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  max-parallelism=3 segment-costs=[388]
+|  JOIN BUILD
+|  |  join-table-id=04 plan-id=05 cohort-id=01
+|  |  build expressions: d_date_sk
+|  |  runtime filters: RF006[bloom] <- d_date_sk
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0 cost=373
+|  |
+|  15:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=1 row-size=8B cardinality=373 cost=15
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB 
thread-reservation=1
+|  max-parallelism=1 segment-costs=[123625]
+|  01:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+|     HDFS partitions=1/1 files=1 size=2.17MB
+|     predicates: d_year = CAST(2000 AS INT)
+|     stored statistics:
+|       table: rows=73.05K size=2.17MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=73.05K
+|     parquet statistics predicates: d_year = CAST(2000 AS INT)
+|     parquet dictionary predicates: d_year = CAST(2000 AS INT)
+|     mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+|     tuple-ids=1 row-size=8B cardinality=373 cost=123620
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+   HDFS partitions=2004/2004 files=2004 size=33.63MB
+   runtime filters: RF001[min_max] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF003[min_max] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF005[min_max] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF000[bloom] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF002[bloom] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF004[bloom] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF006[bloom] ->  
[...]
+   stored statistics:
+     table: rows=287.51K size=33.63MB
+     partitions: 2004/2004 rows=287.51K
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=10.01K 
est-scan-range=373(filtered from 2004)
+   mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+   tuple-ids=0 row-size=16B cardinality=53.52K(filtered from 287.51K) 
cost=18650836
+   in pipelines: 00(GETNEXT)
+====
+# QUERY: TPCDS-Q1-NO-CPC
+# Expect a total of 3 admission slots given to this query if 
COMPUTE_PROCESSING_COST is disabled.
+# Set MT_DOP=1 to keep query compiled in multi thread parallelism mode.
+with customer_total_return as (
+  select sr_customer_sk as ctr_customer_sk,
+    sr_store_sk as ctr_store_sk,
+    sum(SR_RETURN_AMT) as ctr_total_return
+  from tpcds_partitioned_parquet_snap.store_returns,
+    tpcds_partitioned_parquet_snap.date_dim
+  where sr_returned_date_sk = d_date_sk
+  and d_year = 2000
+  group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+  tpcds_partitioned_parquet_snap.store,
+  tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+  select avg(ctr_total_return) * 1.2
+  from customer_total_return ctr2
+  where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- QUERYOPTIONS
+COMPUTE_PROCESSING_COST=False
+MT_DOP=1
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=42.58MB Threads=16
+Per-Host Resource Estimates: Memory=189MB
+F10:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB 
thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: c_customer_id
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB 
thread-reservation=0
+|
+27:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: c_customer_id ASC
+|  limit: 100
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=13 row-size=28B cardinality=100
+|  in pipelines: 14(GETNEXT)
+|
+F04:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+Per-Instance Resources: mem-estimate=504.82KB mem-reservation=0B 
thread-reservation=1
+14:TOP-N [LIMIT=100]
+|  order by: c_customer_id ASC
+|  mem-estimate=2.73KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=13 row-size=28B cardinality=100
+|  in pipelines: 14(GETNEXT), 17(OPEN)
+|
+13:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  hash-table-id=00
+|  hash predicates: sr_store_sk = ctr2.ctr_store_sk
+|  other join predicates: sum(SR_RETURN_AMT) > avg(ctr_total_return) * 
CAST(1.2 AS DECIMAL(2,1))
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=2,5,4 row-size=74B cardinality=53.52K
+|  in pipelines: 17(GETNEXT), 25(OPEN)
+|
+|--F11:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+|  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: ctr2.ctr_store_sk
+|  |  runtime filters: RF000[bloom] <- ctr2.ctr_store_sk, RF001[min_max] <- 
ctr2.ctr_store_sk
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  |
+|  26:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=11 row-size=20B cardinality=6
+|  |  in pipelines: 25(GETNEXT)
+|  |
+|  F09:PLAN FRAGMENT [HASH(ctr2.ctr_store_sk)] hosts=3 instances=3
+|  Per-Instance Resources: mem-estimate=10.09MB mem-reservation=1.94MB 
thread-reservation=1
+|  25:AGGREGATE [FINALIZE]
+|  |  output: avg:merge(ctr_total_return)
+|  |  group by: ctr2.ctr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=11 row-size=20B cardinality=6
+|  |  in pipelines: 25(GETNEXT), 23(OPEN)
+|  |
+|  24:EXCHANGE [HASH(ctr2.ctr_store_sk)]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=10 row-size=20B cardinality=6
+|  |  in pipelines: 23(GETNEXT)
+|  |
+|  F08:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3
+|  Per-Instance Resources: mem-estimate=20.49MB mem-reservation=3.94MB 
thread-reservation=1
+|  10:AGGREGATE [STREAMING]
+|  |  output: avg(sum(SR_RETURN_AMT))
+|  |  group by: sr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=10 row-size=20B cardinality=6
+|  |  in pipelines: 23(GETNEXT)
+|  |
+|  23:AGGREGATE [FINALIZE]
+|  |  output: sum:merge(SR_RETURN_AMT)
+|  |  group by: sr_customer_sk, sr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=8 row-size=24B cardinality=53.52K
+|  |  in pipelines: 23(GETNEXT), 06(OPEN)
+|  |
+|  22:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+|  |  mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=8 row-size=24B cardinality=53.52K
+|  |  in pipelines: 06(GETNEXT)
+|  |
+|  F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB 
thread-reservation=0 runtime-filters-memory=1.00MB
+|  Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB 
thread-reservation=1
+|  09:AGGREGATE [STREAMING]
+|  |  output: sum(SR_RETURN_AMT)
+|  |  group by: sr_customer_sk, sr_store_sk
+|  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=8 row-size=24B cardinality=53.52K
+|  |  in pipelines: 06(GETNEXT)
+|  |
+|  08:HASH JOIN [INNER JOIN, BROADCAST]
+|  |  hash-table-id=01
+|  |  hash predicates: sr_returned_date_sk = d_date_sk
+|  |  fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+|  |  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB 
thread-reservation=0
+|  |  tuple-ids=6,7 row-size=24B cardinality=53.52K
+|  |  in pipelines: 06(GETNEXT), 07(OPEN)
+|  |
+|  |--F12:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  |  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  |  JOIN BUILD
+|  |  |  join-table-id=01 plan-id=02 cohort-id=02
+|  |  |  build expressions: d_date_sk
+|  |  |  runtime filters: RF008[bloom] <- d_date_sk
+|  |  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  |  |
+|  |  21:EXCHANGE [BROADCAST]
+|  |  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  |  tuple-ids=7 row-size=8B cardinality=373
+|  |  |  in pipelines: 07(GETNEXT)
+|  |  |
+|  |  F07:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  |  Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB 
thread-reservation=1
+|  |  07:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+|  |     HDFS partitions=1/1 files=1 size=2.17MB
+|  |     predicates: d_year = CAST(2000 AS INT)
+|  |     stored statistics:
+|  |       table: rows=73.05K size=2.17MB
+|  |       columns: all
+|  |     extrapolated-rows=disabled max-scan-range-rows=73.05K
+|  |     parquet statistics predicates: d_year = CAST(2000 AS INT)
+|  |     parquet dictionary predicates: d_year = CAST(2000 AS INT)
+|  |     mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+|  |     tuple-ids=7 row-size=8B cardinality=373
+|  |     in pipelines: 07(GETNEXT)
+|  |
+|  06:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+|     HDFS partitions=2004/2004 files=2004 size=33.63MB
+|     runtime filters: RF008[bloom] -> sr_returned_date_sk
+|     stored statistics:
+|       table: rows=287.51K size=33.63MB
+|       partitions: 2004/2004 rows=287.51K
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=10.01K 
est-scan-range=373(filtered from 2004)
+|     mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+|     tuple-ids=6 row-size=16B cardinality=53.52K(filtered from 287.51K)
+|     in pipelines: 06(GETNEXT)
+|
+12:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=02
+|  hash predicates: sr_store_sk = s_store_sk
+|  fk/pk conjuncts: sr_store_sk = s_store_sk
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=2,5,4 row-size=74B cardinality=53.52K
+|  in pipelines: 17(GETNEXT), 04(OPEN)
+|
+|--F13:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+|  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  JOIN BUILD
+|  |  join-table-id=02 plan-id=03 cohort-id=01
+|  |  build expressions: s_store_sk
+|  |  runtime filters: RF002[bloom] <- s_store_sk, RF003[min_max] <- s_store_sk
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  |
+|  20:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=18B cardinality=12
+|  |  in pipelines: 04(GETNEXT)
+|  |
+|  F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Host Shared Resources: mem-estimate=1.00MB mem-reservation=1.00MB 
thread-reservation=0 runtime-filters-memory=1.00MB
+|  Per-Instance Resources: mem-estimate=16.09MB mem-reservation=16.00KB 
thread-reservation=1
+|  04:SCAN HDFS [tpcds_partitioned_parquet_snap.store, RANDOM]
+|     HDFS partitions=1/1 files=1 size=9.81KB
+|     predicates: s_state = 'TN'
+|     runtime filters: RF001[min_max] -> 
tpcds_partitioned_parquet_snap.store.s_store_sk, RF000[bloom] -> 
tpcds_partitioned_parquet_snap.store.s_store_sk
+|     stored statistics:
+|       table: rows=12 size=9.81KB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=12
+|     parquet statistics predicates: s_state = 'TN'
+|     parquet dictionary predicates: s_state = 'TN'
+|     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
+|     tuple-ids=4 row-size=18B cardinality=12
+|     in pipelines: 04(GETNEXT)
+|
+11:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash-table-id=03
+|  hash predicates: sr_customer_sk = c_customer_sk
+|  fk/pk conjuncts: none
+|  mem-estimate=0B mem-reservation=0B spill-buffer=256.00KB 
thread-reservation=0
+|  tuple-ids=2,5 row-size=56B cardinality=53.52K
+|  in pipelines: 17(GETNEXT), 05(OPEN)
+|
+|--F14:PLAN FRAGMENT [HASH(sr_customer_sk)] hosts=3 instances=3
+|  |  Per-Instance Resources: mem-estimate=8.84MB mem-reservation=5.75MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  JOIN BUILD
+|  |  join-table-id=03 plan-id=04 cohort-id=01
+|  |  build expressions: c_customer_sk
+|  |  runtime filters: RF004[bloom] <- c_customer_sk, RF005[min_max] <- 
c_customer_sk
+|  |  mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB 
thread-reservation=0
+|  |
+|  19:EXCHANGE [HASH(c_customer_sk)]
+|  |  mem-estimate=3.09MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=5 row-size=32B cardinality=100.00K
+|  |  in pipelines: 05(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=16.42MB mem-reservation=2.00MB 
thread-reservation=1
+|  05:SCAN HDFS [tpcds_partitioned_parquet_snap.customer, RANDOM]
+|     HDFS partitions=1/1 files=1 size=5.49MB
+|     stored statistics:
+|       table: rows=100.00K size=5.49MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=100.00K
+|     mem-estimate=16.00MB mem-reservation=2.00MB thread-reservation=0
+|     tuple-ids=5 row-size=32B cardinality=100.00K
+|     in pipelines: 05(GETNEXT)
+|
+18:EXCHANGE [HASH(sr_customer_sk)]
+|  mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K
+|  in pipelines: 17(GETNEXT)
+|
+F02:PLAN FRAGMENT [HASH(sr_customer_sk,sr_store_sk)] hosts=3 instances=3
+Per-Instance Resources: mem-estimate=10.49MB mem-reservation=1.94MB 
thread-reservation=1
+17:AGGREGATE [FINALIZE]
+|  output: sum:merge(SR_RETURN_AMT)
+|  group by: sr_customer_sk, sr_store_sk
+|  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K
+|  in pipelines: 17(GETNEXT), 00(OPEN)
+|
+16:EXCHANGE [HASH(sr_customer_sk,sr_store_sk)]
+|  mem-estimate=502.09KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Shared Resources: mem-estimate=4.00MB mem-reservation=4.00MB 
thread-reservation=0 runtime-filters-memory=4.00MB
+Per-Instance Resources: mem-estimate=26.33MB mem-reservation=2.12MB 
thread-reservation=1
+03:AGGREGATE [STREAMING]
+|  output: sum(SR_RETURN_AMT)
+|  group by: sr_customer_sk, sr_store_sk
+|  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB 
thread-reservation=0
+|  tuple-ids=2 row-size=24B cardinality=53.52K
+|  in pipelines: 00(GETNEXT)
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash-table-id=04
+|  hash predicates: sr_returned_date_sk = d_date_sk
+|  fk/pk conjuncts: sr_returned_date_sk = d_date_sk
+|  mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0,1 row-size=24B cardinality=53.52K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--F15:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  |  Per-Instance Resources: mem-estimate=2.95MB mem-reservation=2.94MB 
thread-reservation=1 runtime-filters-memory=1.00MB
+|  JOIN BUILD
+|  |  join-table-id=04 plan-id=05 cohort-id=01
+|  |  build expressions: d_date_sk
+|  |  runtime filters: RF006[bloom] <- d_date_sk
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB 
thread-reservation=0
+|  |
+|  15:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=1 row-size=8B cardinality=373
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Instance Resources: mem-estimate=16.05MB mem-reservation=512.00KB 
thread-reservation=1
+|  01:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
+|     HDFS partitions=1/1 files=1 size=2.17MB
+|     predicates: d_year = CAST(2000 AS INT)
+|     stored statistics:
+|       table: rows=73.05K size=2.17MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=73.05K
+|     parquet statistics predicates: d_year = CAST(2000 AS INT)
+|     parquet dictionary predicates: d_year = CAST(2000 AS INT)
+|     mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
+|     tuple-ids=1 row-size=8B cardinality=373
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpcds_partitioned_parquet_snap.store_returns, RANDOM]
+   HDFS partitions=2004/2004 files=2004 size=33.63MB
+   runtime filters: RF001[min_max] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF003[min_max] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF005[min_max] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF000[bloom] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF002[bloom] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_store_sk, RF004[bloom] -> 
tpcds_partitioned_parquet_snap.store_returns.sr_customer_sk, RF006[bloom] ->  
[...]
+   stored statistics:
+     table: rows=287.51K size=33.63MB
+     partitions: 2004/2004 rows=287.51K
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=10.01K 
est-scan-range=373(filtered from 2004)
+   mem-estimate=16.00MB mem-reservation=128.00KB thread-reservation=0
+   tuple-ids=0 row-size=16B cardinality=53.52K(filtered from 287.51K)
+   in pipelines: 00(GETNEXT)
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/processing-cost-admission-slots.test
 
b/testdata/workloads/functional-query/queries/QueryTest/processing-cost-admission-slots.test
new file mode 100644
index 000000000..118df2588
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/processing-cost-admission-slots.test
@@ -0,0 +1,110 @@
+====
+---- QUERY: TPCDS-Q1-CPC-PLANNER-CPU-ASK
+-- This EE tests should match test cases in
+-- 
functional-planner/queries/PlannerTest/processing-cost-plan-admission-slots.test
+-- Any modification here should be applied there as well.
+--
+-- Expect a total of 16 admission slots given to this query if using 
PLANNER_CPU_ASK strategy.
+-- TODO: Figure out how to enable QUERYOPTIONS section for EE test. Currently, 
it is only
+-- work for FE Planner tests via TestFileParser.java.
+SET COMPUTE_PROCESSING_COST=True;
+SET SLOT_COUNT_STRATEGY=PLANNER_CPU_ASK;
+with customer_total_return as (
+  select sr_customer_sk as ctr_customer_sk,
+    sr_store_sk as ctr_store_sk,
+    sum(SR_RETURN_AMT) as ctr_total_return
+  from tpcds_partitioned_parquet_snap.store_returns,
+    tpcds_partitioned_parquet_snap.date_dim
+  where sr_returned_date_sk = d_date_sk
+  and d_year = 2000
+  group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+  tpcds_partitioned_parquet_snap.store,
+  tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+  select avg(ctr_total_return) * 1.2
+  from customer_total_return ctr2
+  where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- RUNTIME_PROFILE
+aggregation(SUM, AdmissionSlots): 16
+row_regex: .* AvgAdmissionSlotsPerExecutor: 6 .*
+row_regex: .* Executor group 2 \(large\):.*
+row_regex: .* CpuAsk: 16 .*
+row_regex: .* EffectiveParallelism: 16 .*
+====
+---- QUERY: TPCDS-Q1-CPC-LARGEST-FRAGMENT
+-- Expect a total of 3 admission slots given to this query if using 
LARGEST_FRAGMENT strategy.
+-- AvgAdmissionSlotsPerExecutor counter should not exist in query profile.
+SET COMPUTE_PROCESSING_COST=True;
+SET SLOT_COUNT_STRATEGY=LARGEST_FRAGMENT;
+with customer_total_return as (
+  select sr_customer_sk as ctr_customer_sk,
+    sr_store_sk as ctr_store_sk,
+    sum(SR_RETURN_AMT) as ctr_total_return
+  from tpcds_partitioned_parquet_snap.store_returns,
+    tpcds_partitioned_parquet_snap.date_dim
+  where sr_returned_date_sk = d_date_sk
+  and d_year = 2000
+  group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+  tpcds_partitioned_parquet_snap.store,
+  tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+  select avg(ctr_total_return) * 1.2
+  from customer_total_return ctr2
+  where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- RUNTIME_PROFILE
+aggregation(SUM, AdmissionSlots): 3
+!row_regex: .* AvgAdmissionSlotsPerExecutor: .*
+row_regex: .* Executor group 2 \(large\):.*
+row_regex: .* CpuAsk: 16 .*
+row_regex: .* EffectiveParallelism: 16 .*
+====
+---- QUERY: TPCDS-Q1-NO-CPC
+-- Expect a total of 3 admission slots given to this query if 
COMPUTE_PROCESSING_COST is disabled.
+-- Set MT_DOP=1 to keep query compiled in multi thread parallelism mode.
+-- Counters related to COMPUTE_PROCESSING_COST options should not exist in 
profile.
+-- It should still go to large executor group due to min memory requirement.
+SET COMPUTE_PROCESSING_COST=False;
+SET MT_DOP=1;
+with customer_total_return as (
+  select sr_customer_sk as ctr_customer_sk,
+    sr_store_sk as ctr_store_sk,
+    sum(SR_RETURN_AMT) as ctr_total_return
+  from tpcds_partitioned_parquet_snap.store_returns,
+    tpcds_partitioned_parquet_snap.date_dim
+  where sr_returned_date_sk = d_date_sk
+  and d_year = 2000
+  group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+  tpcds_partitioned_parquet_snap.store,
+  tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+  select avg(ctr_total_return) * 1.2
+  from customer_total_return ctr2
+  where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100;
+---- RUNTIME_PROFILE
+aggregation(SUM, AdmissionSlots): 3
+!row_regex: .* AvgAdmissionSlotsPerExecutor: .*
+row_regex: .* Executor group 2 \(large\):.*
+!row_regex: .* CpuAsk: .*
+!row_regex: .* EffectiveParallelism: .*
+====
diff --git a/tests/custom_cluster/test_executor_groups.py 
b/tests/custom_cluster/test_executor_groups.py
index e7a9e382c..ce12e0322 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -42,6 +42,32 @@ CPU_TEST_QUERY = "select * from tpcds_parquet.store_sales 
where ss_item_sk = 1 l
 GROUPING_TEST_QUERY = ("select ss_item_sk from tpcds_parquet.store_sales"
     " group by (ss_item_sk) order by ss_item_sk limit 10")
 
+# TPC-DS Q1 to test slightly more complex query.
+TPCDS_Q1 = """
+with customer_total_return as (
+  select sr_customer_sk as ctr_customer_sk,
+    sr_store_sk as ctr_store_sk,
+    sum(SR_RETURN_AMT) as ctr_total_return
+  from tpcds_partitioned_parquet_snap.store_returns,
+    tpcds_partitioned_parquet_snap.date_dim
+  where sr_returned_date_sk = d_date_sk
+  and d_year = 2000
+  group by sr_customer_sk, sr_store_sk
+) select c_customer_id
+from customer_total_return ctr1,
+  tpcds_partitioned_parquet_snap.store,
+  tpcds_partitioned_parquet_snap.customer
+where ctr1.ctr_total_return > (
+  select avg(ctr_total_return) * 1.2
+  from customer_total_return ctr2
+  where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
+and s_store_sk = ctr1.ctr_store_sk
+and s_state = 'TN'
+and ctr1.ctr_customer_sk = c_customer_sk
+order by c_customer_id
+limit 100
+"""
+
 DEFAULT_RESOURCE_POOL = "default-pool"
 
 
@@ -851,14 +877,14 @@ class TestExecutorGroups(CustomClusterTestSuite):
 
     # Create fresh client
     self.create_impala_clients()
-    # Add an exec group with 8 admission slots and 1 executors.
-    self._add_executor_group("group", 1, admission_control_slots=8,
+    # Add an exec group with 4 admission slots and 1 executors.
+    self._add_executor_group("group", 1, admission_control_slots=4,
                              resource_pool="root.tiny", 
extra_args="-mem_limit=2g")
     # Add an exec group with 8 admission slots and 2 executors.
     self._add_executor_group("group", 2, admission_control_slots=8,
                              resource_pool="root.small", 
extra_args="-mem_limit=2g")
-    # Add another exec group with 8 admission slots and 3 executors.
-    self._add_executor_group("group", 3, admission_control_slots=8,
+    # Add another exec group with 64 admission slots and 3 executors.
+    self._add_executor_group("group", 3, admission_control_slots=64,
                              resource_pool="root.large", 
extra_args="-mem_limit=2g")
     assert self._get_num_executor_groups(only_healthy=True) == 3
     assert self._get_num_executor_groups(only_healthy=True,
@@ -909,32 +935,38 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._setup_three_exec_group_cluster(coordinator_test_args)
     self.client.clear_configuration()
 
+    # The default query options for this test.
+    # Some test case will change these options along the test, but should 
eventually
+    # restored to this default values.
+    self._set_query_options({
+      'COMPUTE_PROCESSING_COST': 'true',
+      'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
+
     # Expect to run the query on the small group by default.
-    self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
-        ["Executor Group: root.small-group", "EffectiveParallelism: 11",
-         "ExecutorGroupsConsidered: 2"])
+        ["Executor Group: root.small-group", "EffectiveParallelism: 10",
+         "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
 
     # Test disabling COMPUTE_PROCESING_COST. This will produce non-MT plan.
     self._set_query_options({'COMPUTE_PROCESSING_COST': 'false'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
         ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
          "Verdict: Match"],
-        ["EffectiveParallelism:", "CpuAsk:"])
+        ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
 
     # Test COMPUTE_PROCESING_COST=false and MT_DOP=2.
     self._set_query_options({'MT_DOP': '2'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
         ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
          "Verdict: Match"],
-        ["EffectiveParallelism:", "CpuAsk:"])
+        ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
 
     # Test COMPUTE_PROCESING_COST=true and MT_DOP=2.
     # COMPUTE_PROCESING_COST should override MT_DOP.
     self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
-        ["Executor Group: root.small-group", "EffectiveParallelism: 11",
-         "ExecutorGroupsConsidered: 2"])
+        ["Executor Group: root.small-group", "EffectiveParallelism: 10",
+         "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
 
     # Unset MT_DOP
     self._set_query_options({'MT_DOP': '0'})
@@ -947,7 +979,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "from tpcds_parquet.store_sales where ss_sold_date_sk < 2452184"
        ).format(unique_database, "store_sales_subset"),
       ["Executor Group: root.small", "ExecutorGroupsConsidered: 2",
-       "Verdict: Match", "CpuAsk: 10"])
+       "Verdict: Match", "CpuAsk: 10", "AvgAdmissionSlotsPerExecutor: 5"])
 
     compute_stats_query = ("compute stats {0}.{1}").format(
         unique_database, "store_sales_subset")
@@ -961,6 +993,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
         ["ExecutorGroupsConsidered: 1",
          "Verdict: Assign to first group because query is not auto-scalable"],
         ["Query Options (set by configuration): REQUEST_POOL=",
+         "EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:",
          "Executor Group:"])
     self._verify_total_admitted_queries("root.small", 4)
     self._verify_total_admitted_queries("root.large", 2)
@@ -972,7 +1005,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
         ["Query Options (set by configuration): REQUEST_POOL=root.small",
          "ExecutorGroupsConsidered: 1",
          "Verdict: Assign to first group because query is not auto-scalable"],
-        ["Executor Group:"])
+        ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
     self._verify_total_admitted_queries("root.small", 6)
     self.client.clear_configuration()
 
@@ -983,7 +1016,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
         ["Query Options (set by configuration): REQUEST_POOL=root.large",
          "ExecutorGroupsConsidered: 1",
          "Verdict: Assign to first group because query is not auto-scalable"],
-        ["Executor Group:"])
+        ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
     self._verify_total_admitted_queries("root.large", 4)
 
     # Test that REQUEST_POOL will override executor group selection
@@ -992,7 +1025,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
          "Executor Group: root.large-group",
          ("Verdict: query option REQUEST_POOL=root.large is set. "
           "Memory and cpu limit checking is skipped."),
-         "EffectiveParallelism: 13", "ExecutorGroupsConsidered: 1"])
+         "EffectiveParallelism: 12", "ExecutorGroupsConsidered: 1",
+         "AvgAdmissionSlotsPerExecutor: 4"])
 
     # Test setting REQUEST_POOL=root.large and disabling 
COMPUTE_PROCESSING_COST
     self._set_query_options({'COMPUTE_PROCESSING_COST': 'false'})
@@ -1002,7 +1036,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
          ("Verdict: query option REQUEST_POOL=root.large is set. "
           "Memory and cpu limit checking is skipped."),
          "ExecutorGroupsConsidered: 1"],
-        ["EffectiveParallelism:", "CpuAsk:"])
+        ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
 
     # Unset REQUEST_POOL and restore COMPUTE_PROCESSING_COST.
     self._set_query_options({
@@ -1010,23 +1044,22 @@ class TestExecutorGroups(CustomClusterTestSuite):
       'COMPUTE_PROCESSING_COST': 'true'})
 
     # Test that empty REQUEST_POOL should have no impact.
-    self.client.set_configuration({'REQUEST_POOL': ''})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
-        ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
-         "Verdict: Match"],
+        ["Executor Group: root.small-group", "EffectiveParallelism: 10",
+         "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"],
         ["Query Options (set by configuration): REQUEST_POOL="])
-    self.client.clear_configuration()
 
     # Test that GROUPING_TEST_QUERY will get assigned to the large group.
     self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
         ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-          "Verdict: Match", "CpuAsk: 12"])
+          "Verdict: Match", "CpuAsk: 12", "AvgAdmissionSlotsPerExecutor: 4"])
 
     # ENABLE_REPLAN=false should force query to run in first group (tiny).
     self._set_query_options({'ENABLE_REPLAN': 'false'})
     self._run_query_and_verify_profile(TEST_QUERY,
         ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
-         "Verdict: Assign to first group because query option 
ENABLE_REPLAN=false"])
+         "Verdict: Assign to first group because query option 
ENABLE_REPLAN=false"],
+        ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
     # Unset ENABLE_REPLAN.
     self._set_query_options({'ENABLE_REPLAN': ''})
 
@@ -1035,37 +1068,38 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._run_query_and_verify_profile("SELECT 1",
         ["Executor Group: empty group (using coordinator only)",
          "ExecutorGroupsConsidered: 1",
-         "Verdict: Assign to first group because the number of nodes is 1"])
+         "Verdict: Assign to first group because the number of nodes is 1"],
+        ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
 
     # CREATE/DROP database should work and assigned to tiny group.
     self._run_query_and_verify_profile(
         "CREATE DATABASE test_non_scalable_query;",
         ["ExecutorGroupsConsidered: 1",
          "Verdict: Assign to first group because query is not auto-scalable"],
-        ["Executor Group:"])
+        ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
     self._run_query_and_verify_profile(
         "DROP DATABASE test_non_scalable_query;",
         ["ExecutorGroupsConsidered: 1",
          "Verdict: Assign to first group because query is not auto-scalable"],
-        ["Executor Group:"])
+        ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
 
     # Test combination of PROCESSING_COST_MIN_THREADS and 
MAX_FRAGMENT_INSTANCES_PER_NODE.
     self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '3'})
     self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
         ["Executor Group: root.large-group", "EffectiveParallelism: 9",
-         "ExecutorGroupsConsidered: 3"])
+         "ExecutorGroupsConsidered: 3", "AvgAdmissionSlotsPerExecutor: 3"])
     self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '4'})
     self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
         ["Executor Group: root.large-group", "EffectiveParallelism: 12",
-         "ExecutorGroupsConsidered: 3"])
+         "ExecutorGroupsConsidered: 3", "AvgAdmissionSlotsPerExecutor: 4"])
     self._set_query_options({'PROCESSING_COST_MIN_THREADS': '2'})
     self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
         ["Executor Group: root.large-group", "EffectiveParallelism: 12",
-         "ExecutorGroupsConsidered: 3"])
+         "ExecutorGroupsConsidered: 3", "AvgAdmissionSlotsPerExecutor: 4"])
     self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '2'})
     self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
         ["Executor Group: root.small-group", "EffectiveParallelism: 4",
-         "ExecutorGroupsConsidered: 2"])
+         "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 2"])
     self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
     result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
     status = (r"PROCESSING_COST_MIN_THREADS \(2\) can not be larger than "
@@ -1081,13 +1115,13 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._run_query_and_verify_profile(
         "SELECT count(*) FROM tpcds_parquet.store_sales",
         ["Executor Group: root.small-group", "EffectiveParallelism: 10",
-         "ExecutorGroupsConsidered: 2"])
+         "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
 
     # Test optimized count star query with 383 scan ranges assign to tiny 
group.
     self._run_query_and_verify_profile(
        "SELECT count(*) FROM tpcds_parquet.store_sales WHERE ss_sold_date_sk < 
2451200",
        ["Executor Group: root.tiny-group", "EffectiveParallelism: 2",
-        "ExecutorGroupsConsidered: 1"])
+        "ExecutorGroupsConsidered: 1", "AvgAdmissionSlotsPerExecutor: 2"])
 
     # Test optimized count star query with 1 scan range detected as trivial 
query
     # and assign to tiny group.
@@ -1095,20 +1129,21 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "SELECT count(*) FROM tpcds_parquet.date_dim",
        ["Executor Group: empty group (using coordinator only)",
         "ExecutorGroupsConsidered: 1",
-        "Verdict: Assign to first group because the number of nodes is 1"])
+        "Verdict: Assign to first group because the number of nodes is 1"],
+       ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
 
     # Test unoptimized count star query assign to small group.
     self._run_query_and_verify_profile(
       ("SELECT count(*) FROM tpcds_parquet.store_sales "
        "WHERE ss_ext_discount_amt != 0.3857"),
       ["Executor Group: root.small-group", "EffectiveParallelism: 10",
-       "ExecutorGroupsConsidered: 2"])
+       "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
 
     # Test zero slot scan query assign to small group.
     self._run_query_and_verify_profile(
       "SELECT count(ss_sold_date_sk) FROM tpcds_parquet.store_sales",
       ["Executor Group: root.small-group", "EffectiveParallelism: 10",
-       "ExecutorGroupsConsidered: 2"])
+       "ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
     # END testing count queries
 
     # BEGIN testing insert + MAX_FS_WRITER
@@ -1119,7 +1154,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select id, year from functional_parquet.alltypes"
        ).format(unique_database, "test_ctas1"),
       ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
-       "Verdict: Match", "CpuAsk: 1"])
+       "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"])
     self.__verify_fs_writers(result, 1, [0, 1])
 
     # Test unpartitioned insert, small scan, no MAX_FS_WRITER, with limit.
@@ -1129,7 +1164,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select id, year from functional_parquet.alltypes limit 100000"
        ).format(unique_database, "test_ctas2"),
       ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
-       "Verdict: Match", "CpuAsk: 2"])
+       "Verdict: Match", "CpuAsk: 2", "AvgAdmissionSlotsPerExecutor: 2"])
     self.__verify_fs_writers(result, 1, [0, 2])
 
     # Test partitioned insert, small scan, no MAX_FS_WRITER.
@@ -1139,7 +1174,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select id, year from functional_parquet.alltypes"
        ).format(unique_database, "test_ctas3"),
       ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
-       "Verdict: Match", "CpuAsk: 1"])
+       "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"])
     self.__verify_fs_writers(result, 1, [0, 1])
 
     # Test unpartitioned insert, large scan, no MAX_FS_WRITER.
@@ -1148,7 +1183,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select ss_item_sk, ss_ticket_number, ss_store_sk "
        "from tpcds_parquet.store_sales").format(unique_database, "test_ctas4"),
       ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-       "Verdict: Match", "CpuAsk: 13"])
+       "Verdict: Match", "CpuAsk: 13", "AvgAdmissionSlotsPerExecutor: 5"])
     self.__verify_fs_writers(result, 1, [0, 4, 4, 5])
 
     # Test partitioned insert, large scan, no MAX_FS_WRITER.
@@ -1157,7 +1192,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select ss_item_sk, ss_ticket_number, ss_store_sk "
        "from tpcds_parquet.store_sales").format(unique_database, "test_ctas5"),
       ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-       "Verdict: Match", "CpuAsk: 15"])
+       "Verdict: Match", "CpuAsk: 15", "AvgAdmissionSlotsPerExecutor: 5"])
     self.__verify_fs_writers(result, 3, [0, 5, 5, 5])
 
     # Test partitioned insert, large scan, high MAX_FS_WRITER.
@@ -1167,7 +1202,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select ss_item_sk, ss_ticket_number, ss_store_sk "
        "from tpcds_parquet.store_sales").format(unique_database, "test_ctas6"),
       ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-       "Verdict: Match", "CpuAsk: 15"])
+       "Verdict: Match", "CpuAsk: 15", "AvgAdmissionSlotsPerExecutor: 5"])
     self.__verify_fs_writers(result, 3, [0, 5, 5, 5])
 
     # Test partitioned insert, large scan, low MAX_FS_WRITER.
@@ -1177,7 +1212,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select ss_item_sk, ss_ticket_number, ss_store_sk "
        "from tpcds_parquet.store_sales").format(unique_database, "test_ctas7"),
       ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-       "Verdict: Match", "CpuAsk: 14"])
+       "Verdict: Match", "CpuAsk: 14", "AvgAdmissionSlotsPerExecutor: 5"])
     self.__verify_fs_writers(result, 2, [0, 4, 5, 5])
 
     # Test that non-CTAS unpartitioned insert works. MAX_FS_WRITER=2.
@@ -1186,7 +1221,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select ss_item_sk, ss_ticket_number, ss_store_sk "
        "from tpcds_parquet.store_sales").format(unique_database, "test_ctas4"),
       ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-       "Verdict: Match", "CpuAsk: 13"])
+       "Verdict: Match", "CpuAsk: 13", "AvgAdmissionSlotsPerExecutor: 5"])
     self.__verify_fs_writers(result, 1, [0, 4, 4, 5])
 
     # Test that non-CTAS partitioned insert works. MAX_FS_WRITER=2.
@@ -1196,20 +1231,48 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select ss_item_sk, ss_ticket_number, ss_store_sk "
        "from tpcds_parquet.store_sales").format(unique_database, "test_ctas7"),
       ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-       "Verdict: Match", "CpuAsk: 14"])
+       "Verdict: Match", "CpuAsk: 14", "AvgAdmissionSlotsPerExecutor: 5"])
     self.__verify_fs_writers(result, 2, [0, 4, 5, 5])
 
     # Unset MAX_FS_WRITERS.
     self._set_query_options({'MAX_FS_WRITERS': ''})
     # END testing insert + MAX_FS_WRITER
 
+    # BEGIN test slot count strategy
+    # Unset SLOT_COUNT_STRATEGY to use default strategy, which is max # of 
instances
+    # of any fragment on that backend.
+    # TPCDS_Q1 at root.large_group will have following CoreCount trace:
+    #   CoreCount={total=16 
trace=F15:3+F01:1+F14:3+F03:1+F13:3+F05:1+F12:3+F07:1},
+    #   coresRequired=16
+    self._set_query_options({'SLOT_COUNT_STRATEGY': ''})
+    result = self._run_query_and_verify_profile(TPCDS_Q1,
+      ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
+       "Verdict: Match", "CpuAsk: 16",
+       "AdmissionSlots: 1"  # coordinator and executors all have 1 slot
+       ],
+      ["AvgAdmissionSlotsPerExecutor:", "AdmissionSlots: 6"])
+
+    # Test with SLOT_COUNT_STRATEGY='PLANNER_CPU_ASK'.
+    self._set_query_options({'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
+    result = self._run_query_and_verify_profile(TPCDS_Q1,
+      ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
+       "Verdict: Match", "CpuAsk: 16", "AvgAdmissionSlotsPerExecutor: 6",
+       # coordinator has 1 slot
+       "AdmissionSlots: 1",
+       # 1 executor has F15:1+F01:1+F14:1+F03:1+F13:1+F05:1+F12:1+F07:1 = 8 
slots
+       "AdmissionSlots: 8",
+       # 2 executors have F15:1+F14:1+F13:1+F12:1 = 4 slots
+       "AdmissionSlots: 4"
+       ])
+    # END test slot count strategy
+
     # Check resource pools on the Web queries site and admission site
     self._verify_query_num_for_resource_pool("root.small", 10)
     self._verify_query_num_for_resource_pool("root.tiny", 5)
-    self._verify_query_num_for_resource_pool("root.large", 12)
+    self._verify_query_num_for_resource_pool("root.large", 14)
     self._verify_total_admitted_queries("root.small", 11)
     self._verify_total_admitted_queries("root.tiny", 8)
-    self._verify_total_admitted_queries("root.large", 16)
+    self._verify_total_admitted_queries("root.large", 18)
 
   @pytest.mark.execute_serially
   def test_query_cpu_count_divisor_two(self):
@@ -1217,29 +1280,35 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # But the CpuAsk is around half of EffectiveParallelism.
     coordinator_test_args = "-query_cpu_count_divisor=2 "
     self._setup_three_exec_group_cluster(coordinator_test_args)
-    self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
+    self._set_query_options({
+      'COMPUTE_PROCESSING_COST': 'true',
+      'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
         ["Executor Group: root.small-group",
-         "CpuAsk: 6", "EffectiveParallelism: 11",
-         "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"])
+         "CpuAsk: 5", "EffectiveParallelism: 10",
+         "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2",
+         "AvgAdmissionSlotsPerExecutor: 5"])
 
     # Test that QUERY_CPU_COUNT_DIVISOR option can override
     # query_cpu_count_divisor flag.
     self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '1.0'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
         ["Executor Group: root.small-group",
-         "CpuAsk: 11", "EffectiveParallelism: 11",
-         "CpuCountDivisor: 1", "ExecutorGroupsConsidered: 2"])
+         "CpuAsk: 10", "EffectiveParallelism: 10",
+         "CpuCountDivisor: 1", "ExecutorGroupsConsidered: 2",
+         "AvgAdmissionSlotsPerExecutor: 5"])
     self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '0.5'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
         ["Executor Group: root.large-group",
-         "CpuAsk: 22", "EffectiveParallelism: 11",
-         "CpuCountDivisor: 0.5", "ExecutorGroupsConsidered: 3"])
+         "CpuAsk: 24", "EffectiveParallelism: 10",
+         "CpuCountDivisor: 0.5", "ExecutorGroupsConsidered: 3",
+         "AvgAdmissionSlotsPerExecutor: 4"])
     self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '2.0'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
         ["Executor Group: root.small-group",
-         "CpuAsk: 6", "EffectiveParallelism: 11",
-         "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"])
+         "CpuAsk: 5", "EffectiveParallelism: 10",
+         "CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2",
+         "AvgAdmissionSlotsPerExecutor: 5"])
 
     # Check resource pools on the Web queries site and admission site
     self._verify_query_num_for_resource_pool("root.small", 3)
@@ -1255,11 +1324,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._setup_three_exec_group_cluster(coordinator_test_args)
     self._set_query_options({
       'COMPUTE_PROCESSING_COST': 'true',
+      'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK',
       'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
-        ["Executor Group: root.large-group", "EffectiveParallelism: 4",
-         "ExecutorGroupsConsidered: 3", "CpuAsk: 134",
-         "Verdict: Match"])
+        ["Executor Group: root.large-group", "EffectiveParallelism: 3",
+         "ExecutorGroupsConsidered: 3", "CpuAsk: 100",
+         "Verdict: Match", "AvgAdmissionSlotsPerExecutor: 1"])
 
     # Unset MAX_FRAGMENT_INSTANCES_PER_NODE.
     self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': ''})
@@ -1267,9 +1337,10 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # Expect that a query still admitted to last group even if
     # its resource requirement exceed the limit on that last executor group.
     self._run_query_and_verify_profile(CPU_TEST_QUERY,
-        ["Executor Group: root.large-group", "EffectiveParallelism: 16",
+        ["Executor Group: root.large-group", "EffectiveParallelism: 15",
          "ExecutorGroupsConsidered: 3", "CpuAsk: 534",
-         "Verdict: no executor group set fit. Admit to last executor group 
set."])
+         "Verdict: no executor group set fit. Admit to last executor group 
set.",
+         "AvgAdmissionSlotsPerExecutor: 5"])
 
     # Check resource pools on the Web queries site and admission site
     self._verify_query_num_for_resource_pool("root.large", 2)
@@ -1282,10 +1353,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
     coordinator_test_args = ("-query_cpu_count_divisor=0.03 "
         "-skip_resource_checking_on_last_executor_group_set=false ")
     self._setup_three_exec_group_cluster(coordinator_test_args)
-    self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
+    self._set_query_options({
+      'COMPUTE_PROCESSING_COST': 'true',
+      'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
     result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
     assert ("AnalysisException: The query does not fit largest executor group 
sets. "
-        "Reason: not enough cpu cores (require=434, max=192).") in str(result)
+        "Reason: not enough cpu cores (require=400, max=192).") in str(result)
 
   @pytest.mark.execute_serially
   def test_min_processing_per_thread_small(self):
@@ -1294,24 +1367,29 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._setup_three_exec_group_cluster(coordinator_test_args)
 
     # Test that GROUPING_TEST_QUERY will get assigned to the large group.
-    self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
+    self._set_query_options({
+      'COMPUTE_PROCESSING_COST': 'true',
+      'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
     self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
         ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-          "Verdict: Match", "CpuAsk: 15"])
+          "Verdict: Match", "CpuAsk: 15",
+          "AvgAdmissionSlotsPerExecutor: 5"])
 
     # Test that high_scan_cost_query will get assigned to the large group.
     high_scan_cost_query = ("SELECT ss_item_sk FROM tpcds_parquet.store_sales "
         "WHERE ss_item_sk < 1000000 GROUP BY ss_item_sk LIMIT 10")
     self._run_query_and_verify_profile(high_scan_cost_query,
         ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-          "Verdict: Match", "CpuAsk: 18"])
+          "Verdict: Match", "CpuAsk: 18",
+          "AvgAdmissionSlotsPerExecutor: 6"])
 
     # Test that high_scan_cost_query will get assigned to the small group
     # if MAX_FRAGMENT_INSTANCES_PER_NODE is limited to 1.
     self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
     self._run_query_and_verify_profile(high_scan_cost_query,
         ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
-          "Verdict: Match", "CpuAsk: 1"])
+          "Verdict: Match", "CpuAsk: 1",
+          "AvgAdmissionSlotsPerExecutor: 1"])
 
     # Check resource pools on the Web queries site and admission site
     self._verify_query_num_for_resource_pool("root.tiny", 1)
diff --git a/tests/query_test/test_processing_cost.py 
b/tests/query_test/test_processing_cost.py
new file mode 100644
index 000000000..d2e942a79
--- /dev/null
+++ b/tests/query_test/test_processing_cost.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Functional tests running the TPCH workload.
+from __future__ import absolute_import, division, print_function
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import (
+  create_parquet_dimension,
+  create_single_exec_option_dimension
+)
+
+
+class TestProcessingCost(ImpalaTestSuite):
+  """Test processing cost in non-dedicated coordinator environment."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestProcessingCost, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    
cls.ImpalaTestMatrix.add_dimension(create_parquet_dimension(cls.get_workload()))
+
+  def test_admission_slots(self, vector):
+    self.run_test_case('QueryTest/processing-cost-admission-slots', vector)
diff --git a/tests/query_test/test_tpcds_queries.py 
b/tests/query_test/test_tpcds_queries.py
index c8496f35f..2dbb896e6 100644
--- a/tests/query_test/test_tpcds_queries.py
+++ b/tests/query_test/test_tpcds_queries.py
@@ -24,6 +24,7 @@ from copy import deepcopy
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfDockerizedCluster
 from tests.common.test_dimensions import (
+    add_mandatory_exec_option,
     create_single_exec_option_dimension,
     is_supported_insert_format)
 
@@ -39,7 +40,7 @@ class TestTpcdsQuery(ImpalaTestSuite):
         v.get_value('table_format').file_format not in ['rc', 'hbase', 'kudu'] 
and
         v.get_value('table_format').compression_codec in ['none', 'snap'] and
         v.get_value('table_format').compression_type != 'record')
-    cls.ImpalaTestMatrix.add_mandatory_exec_option('decimal_v2', 0)
+    add_mandatory_exec_option(cls, 'decimal_v2', 0)
 
     if cls.exploration_strategy() != 'exhaustive':
       # Cut down on the execution time for these tests in core by running only
@@ -754,8 +755,9 @@ class TestTpcdsQueryWithProcessingCost(TestTpcdsQuery):
   @classmethod
   def add_test_dimensions(cls):
     super(TestTpcdsQueryWithProcessingCost, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_mandatory_exec_option('compute_processing_cost', 
1)
-    
cls.ImpalaTestMatrix.add_mandatory_exec_option('max_fragment_instances_per_node',
 4)
+    add_mandatory_exec_option(cls, 'compute_processing_cost', 1)
+    add_mandatory_exec_option(cls, 'max_fragment_instances_per_node', 4)
+    add_mandatory_exec_option(cls, 'slot_count_strategy', 'planner_cpu_ask')
 
   def test_tpcds_q51a(self, vector):
     """Reduce max_fragment_instances_per_node to lower memory requirement."""

Reply via email to