This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b07bd6dde IMPALA-13469: Deflake test_query_cpu_count_on_insert
b07bd6dde is described below

commit b07bd6ddeb6b1970f65b85ff927726ba3003f8aa
Author: Riza Suminto <[email protected]>
AuthorDate: Mon Oct 21 20:19:57 2024 -0700

    IMPALA-13469: Deflake test_query_cpu_count_on_insert
    
    A new test case from IMPALA-13445 reveals a pre-existing bug where
    cost-based planning may increase expectedNumInputInstance greater than
    inputFragment.getNumInstances(), which leads to precondition violation.
    The following scenario all happened when the Precondition was hit:
    
    1. The environment is either Erasure Coded HDFS or Ozone.
    2. The source table does not have stats nor numRows table property.
    3. There is only one fragment consisting of a ScanNode in the plan tree
       before the addition of DML fragment.
    4. Byte-based cardinality estimation logic kicks in.
    5. Byte-based cardinality causes high scan cost, which leads to
       maxScanThread exceeding inputFragment.getPlanRoot().
    6. expectedNumInputInstance is assigned equal to maxScanThread.
    7. Precondition expectedNumInputInstance < inputFragment.getPlanRoot()
       is violated.
    
    This scenario triggers a special condition that attempts to lower
    expectedNumInputInstance. But instead of lowering
    expectedNumInputInstance, the special logic increases it due to higher
    byte-based cardinality estimation.
    
    There is also a new bug where DistributedPlanner.java mistakenly passes
    root.getInputCardinality() instead of root.getCardinality().
    
    This patch fixes both issues and does minor refactoring to change
    variable names into camel cases. Relaxed validation of the last test
    case of test_query_cpu_count_on_insert to let it pass in Erasure Coded
    HDFS and Ozone setup.
    
    Testing:
    - Make several assertions in test_executor_groups.py more verbose.
    - Pass test_executor_groups.py in Erasure Coded HDFS and Ozone setup.
    - Added new Planner tests with unknown cardinality estimation.
    - Pass core tests in regular setup.
    
    Change-Id: I834eb6bf896752521e733cd6b77a03f746e6a447
    Reviewed-on: http://gerrit.cloudera.org:8080/21966
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/planner/DistributedPlanner.java  | 47 ++++++++-------
 .../org/apache/impala/planner/PlannerTestBase.java |  5 +-
 .../tpcds_cpu_cost/tpcds-ddl-iceberg.test          | 42 ++++++++++++++
 .../tpcds_cpu_cost/tpcds-ddl-parquet.test          | 43 ++++++++++++++
 tests/custom_cluster/test_executor_groups.py       | 67 +++++++++++++++-------
 5 files changed, 161 insertions(+), 43 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index b72144754..0dd8ecb2b 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -201,10 +201,10 @@ public class DistributedPlanner {
       List<PlanFragment> fragments)
       throws ImpalaException {
     boolean isComputeCost = 
analyzer.getQueryOptions().isCompute_processing_cost();
-    boolean enforce_hdfs_writer_limit = dmlStmt.getTargetTable() instanceof 
FeFsTable
+    boolean enforceHdfsWriterLimit = dmlStmt.getTargetTable() instanceof 
FeFsTable
         && (analyzer.getQueryOptions().getMax_fs_writers() > 0 || 
isComputeCost);
 
-    if (dmlStmt.hasNoShuffleHint() && !enforce_hdfs_writer_limit) return 
inputFragment;
+    if (dmlStmt.hasNoShuffleHint() && !enforceHdfsWriterLimit) return 
inputFragment;
 
     List<Expr> partitionExprs = new 
ArrayList<>(dmlStmt.getPartitionKeyExprs());
     // Ignore constants for the sake of partitioning.
@@ -214,10 +214,10 @@ public class DistributedPlanner {
     // Kudu tables here (IMPALA-5254).
     DataPartition inputPartition = inputFragment.getDataPartition();
     if (!partitionExprs.isEmpty()
-        && analyzer.setsHaveValueTransfer(inputPartition.getPartitionExprs(),
-            partitionExprs, true)
+        && analyzer.setsHaveValueTransfer(
+            inputPartition.getPartitionExprs(), partitionExprs, true)
         && !(dmlStmt.getTargetTable() instanceof FeKuduTable)
-        && !enforce_hdfs_writer_limit) {
+        && !enforceHdfsWriterLimit) {
       return inputFragment;
     }
 
@@ -234,7 +234,7 @@ public class DistributedPlanner {
     boolean hasHdfsScanORUnion = !hdfsScanNodes.isEmpty() || 
!unionNodes.isEmpty();
 
     int expectedNumInputInstance = inputFragment.getNumInstances();
-    if (enforce_hdfs_writer_limit && isComputeCost) {
+    if (enforceHdfsWriterLimit && isComputeCost) {
       // Default to minParallelism * numNodes if cardinality or average row 
size is
       // unknown.
       int minInstances = IntMath.saturatedMultiply(
@@ -249,7 +249,7 @@ public class DistributedPlanner {
         // Both cardinality and avg row size is known.
         costBasedMaxWriter = HdfsTableSink.bytesBasedNumWriters(
             inputFragment.getNumNodes(), maxInstances, isPartitioned, 
numPartitions,
-            root.getInputCardinality(), root.getAvgRowSize());
+            root.getCardinality(), root.getAvgRowSize());
       }
 
       if (maxHdfsWriters > 0) {
@@ -260,16 +260,19 @@ public class DistributedPlanner {
         maxHdfsWriters = costBasedMaxWriter;
       }
       LOG.trace("isPartitioned={} numDistinctPartition={} 
costBasedMaxWriter={} "
-              + "maxHdfsWriters={}",
-          isPartitioned, numPartitions, costBasedMaxWriter, maxHdfsWriters);
+              + "maxHdfsWriters={} inputCardinality={}",
+          isPartitioned, numPartitions, costBasedMaxWriter, maxHdfsWriters,
+          root.getCardinality());
       Preconditions.checkState(maxHdfsWriters > 0);
       dmlStmt.setMaxTableSinks(maxHdfsWriters);
       // At this point, parallelism of writer fragment is fixed and will not 
be adjusted
       // by costing phase.
 
       if (!hdfsScanNodes.isEmpty() && fragments.size() == 1) {
-        // If input fragment have HdfsScanNode and input fragment is the only 
fragment in
-        // the plan, check for opportunity to collocate scan nodes and table 
sink.
+        // If input fragment have HdfsScanNode, and input fragment is the only 
fragment in
+        // the plan, and the scan cost is low, expectedNumInputInstance can be 
lowered
+        // down. This can increase chance to  colocate scan nodes and table 
sinks
+        // (case 3 below).
         // Since the actual costing phase only happens later after distributed 
plan
         // created, this code redundantly compute the scan cost ahead of 
costing phase
         // to help estimate the scan parallelism.
@@ -281,9 +284,7 @@ public class DistributedPlanner {
               maxScanThread, 
scanCost.getNumInstanceMax(inputFragment.getNumNodes()));
         }
         maxScanThread = Math.min(maxInstances, maxScanThread);
-        // Override expectedNumInputInstance so that collocation may happen
-        // (case 3 in branch below).
-        expectedNumInputInstance = maxScanThread;
+        expectedNumInputInstance = Math.min(maxScanThread, 
expectedNumInputInstance);
       }
     }
 
@@ -295,7 +296,7 @@ public class DistributedPlanner {
         // TODO: make a more sophisticated decision here for partitioned 
tables and when
         // we have info about tablet locations.
         if (partitionExprs.isEmpty()) return inputFragment;
-      } else if (!enforce_hdfs_writer_limit || !hasHdfsScanORUnion
+      } else if (!enforceHdfsWriterLimit || !hasHdfsScanORUnion
           || (expectedNumInputInstance <= maxHdfsWriters)) {
         // Only consider skipping the addition of an exchange node if
         // 1. The hdfs writer limit does not apply
@@ -306,13 +307,15 @@ public class DistributedPlanner {
         // Basically covering all cases where we don't mind restricting the 
parallelism
         // of their instances.
         Preconditions.checkState(
-            expectedNumInputInstance <= inputFragment.getNumInstances());
-        int input_instances = expectedNumInputInstance;
-        if (enforce_hdfs_writer_limit && !hasHdfsScanORUnion) {
+            expectedNumInputInstance <= inputFragment.getNumInstances(),
+            "expectedNumInputInstance (%s) > inputFragment.getNumInstances() 
(%s)",
+            expectedNumInputInstance, inputFragment.getNumInstances());
+        int inputInstances = expectedNumInputInstance;
+        if (enforceHdfsWriterLimit && !hasHdfsScanORUnion) {
           // For an internal fragment we enforce an upper limit based on the
           // resulting maxHdfsWriters.
           Preconditions.checkState(maxHdfsWriters > 0);
-          input_instances = Math.min(input_instances, maxHdfsWriters);
+          inputInstances = Math.min(inputInstances, maxHdfsWriters);
         }
         // If the existing partition exprs are a subset of the table partition 
exprs,
         // check if it is distributed across all nodes. If so, don't 
repartition.
@@ -324,7 +327,7 @@ public class DistributedPlanner {
         if (Expr.isSubset(inputPartition.getPartitionExprs(), partitionExprs)) 
{
           long numInputPartitions =
               getNumDistinctValues(inputPartition.getPartitionExprs());
-          if (numInputPartitions >= input_instances) { return inputFragment; }
+          if (numInputPartitions >= inputInstances) { return inputFragment; }
         }
 
         // Don't repartition if we know we have fewer partitions than nodes
@@ -335,7 +338,7 @@ public class DistributedPlanner {
         // size in the particular file format of the output table/partition.
         // We should always know on how many nodes our input is running.
         Preconditions.checkState(expectedNumInputInstance != -1);
-        if (numPartitions > 0 && numPartitions <= input_instances) {
+        if (numPartitions > 0 && numPartitions <= inputInstances) {
           return inputFragment;
         }
       }
@@ -347,7 +350,7 @@ public class DistributedPlanner {
     Preconditions.checkState(exchNode.hasValidStats());
     DataPartition partition;
     if (partitionExprs.isEmpty()) {
-      if (enforce_hdfs_writer_limit
+      if (enforceHdfsWriterLimit
           && inputFragment.getDataPartition().getType() == 
TPartitionType.RANDOM) {
         // This ensures the parallelism of the writers is maintained while 
maintaining
         // legacy behavior(when not using MAX_FS_WRITER query option).
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 16b0ee399..06ac6c5e0 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -428,7 +428,10 @@ public class PlannerTestBase extends FrontendTestBase {
         .setMax_fragment_instances_per_node(12)
         .setReplica_preference(TReplicaPreference.REMOTE)
         .setSlot_count_strategy(TSlotCountStrategy.PLANNER_CPU_ASK)
-        .setPlanner_testcase_mode(true);
+        .setPlanner_testcase_mode(true)
+        // Required so that output doesn't vary by whether scanned tables have 
stats &
+        // numRows property or not.
+        .setDisable_hdfs_num_rows_estimate(true);
   }
 
   protected static Set<PlannerTestOption> tpcdsParquetTestOptions() {
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test
index 31c2af784..dab85fa34 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-iceberg.test
@@ -739,3 +739,45 @@ WRITE TO HDFS 
[tpcds_partitioned_parquet_snap.store_sales_zero_cardinality, OVER
    tuple-ids=0 row-size=96B cardinality=0 cost=0
    in pipelines: 00(GETNEXT)
 ====
+# Partitioned insert with unavailable stats.
+# tpcds_seq_snap does not have stats collected.
+# Exchange node must exist in query plan.
+create table store_sales_without_stats partitioned by (part_col)
+stored as iceberg as
+select *, ss_item_sk as part_col
+from tpcds_seq_snap.store_sales
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=102.00MB Threads=13
+Per-Host Resource Estimates: Memory=381MB
+F01:PLAN FRAGMENT [HASH(ss_item_sk)] hosts=10 instances=10
+|  Per-Instance Resources: mem-estimate=140.19MB mem-reservation=6.00MB 
thread-reservation=1
+|  max-parallelism=10 segment-costs=[0, 3954235]
+WRITE TO HDFS [tpcds_partitioned_parquet_snap.store_sales_without_stats, 
OVERWRITE=false, PARTITION-KEYS=(ss_item_sk)]
+|  output exprs: ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, 
ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, 
ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price, 
ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, 
ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, 
ss_net_profit, ss_sold_date_sk, ss_item_sk
+|  mem-estimate=100.00KB mem-reservation=0B thread-reservation=0 cost=3954235
+|
+02:SORT
+|  order by: ss_item_sk ASC NULLS LAST
+|  mem-estimate=128.00MB mem-reservation=6.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=1 row-size=100B cardinality=unavailable cost=0
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+01:EXCHANGE [HASH(ss_item_sk)]
+|  mem-estimate=12.19MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=100B cardinality=unavailable cost=0
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=10 instances=120
+Per-Instance Resources: mem-estimate=20.06MB mem-reservation=8.00MB 
thread-reservation=1
+max-parallelism=120 segment-costs=[1200000000]
+00:SCAN HDFS [tpcds_seq_snap.store_sales, RANDOM]
+   HDFS partitions=1824/1824 files=1824 size=215.96MB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/1824 rows=unavailable
+     columns missing stats: ss_sold_time_sk, ss_item_sk, ss_customer_sk, 
ss_cdemo_sk, ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, 
ss_ticket_number, ss_quantity, ss_wholesale_cost, ss_list_price, 
ss_sales_price, ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, 
ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, 
ss_net_profit
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=8.00MB thread-reservation=0
+   tuple-ids=0 row-size=100B cardinality=unavailable cost=1200000000
+   in pipelines: 00(GETNEXT)
+====
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test
index 63d48fc93..e790dfb82 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds_cpu_cost/tpcds-ddl-parquet.test
@@ -755,3 +755,46 @@ WRITE TO HDFS 
[tpcds_partitioned_parquet_snap.store_sales_zero_cardinality, OVER
    tuple-ids=0 row-size=96B cardinality=0 cost=0
    in pipelines: 00(GETNEXT)
 ====
+# Partitioned insert with unavailable stats.
+# tpcds_seq_snap does not have stats collected.
+# Exchange node must exist in query plan.
+create table store_sales_without_stats partitioned by (part_col)
+stored as parquet as
+select *, ss_item_sk as part_col
+from tpcds_seq_snap.store_sales
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=102.00MB Threads=13
+Per-Host Resource Estimates: Memory=1.36GB
+F01:PLAN FRAGMENT [HASH(ss_item_sk)] hosts=10 instances=10
+|  Per-Instance Resources: mem-estimate=1.12GB mem-reservation=6.00MB 
thread-reservation=1
+|  max-parallelism=10 segment-costs=[0, 3954235]
+WRITE TO HDFS [tpcds_partitioned_parquet_snap.store_sales_without_stats, 
OVERWRITE=false, PARTITION-KEYS=(ss_item_sk)]
+|  partitions=unavailable
+|  output exprs: ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, 
ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, 
ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price, 
ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, 
ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, 
ss_net_profit, ss_sold_date_sk, ss_item_sk
+|  mem-estimate=1.00GB mem-reservation=0B thread-reservation=0 cost=3954235
+|
+02:SORT
+|  order by: ss_item_sk ASC NULLS LAST
+|  mem-estimate=128.00MB mem-reservation=6.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=1 row-size=100B cardinality=unavailable cost=0
+|  in pipelines: 02(GETNEXT), 00(OPEN)
+|
+01:EXCHANGE [HASH(ss_item_sk)]
+|  mem-estimate=12.19MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=100B cardinality=unavailable cost=0
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=10 instances=120
+Per-Instance Resources: mem-estimate=20.06MB mem-reservation=8.00MB 
thread-reservation=1
+max-parallelism=120 segment-costs=[1200000000]
+00:SCAN HDFS [tpcds_seq_snap.store_sales, RANDOM]
+   HDFS partitions=1824/1824 files=1824 size=215.96MB
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/1824 rows=unavailable
+     columns missing stats: ss_sold_time_sk, ss_item_sk, ss_customer_sk, 
ss_cdemo_sk, ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, 
ss_ticket_number, ss_quantity, ss_wholesale_cost, ss_list_price, 
ss_sales_price, ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, 
ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, 
ss_net_profit
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=8.00MB thread-reservation=0
+   tuple-ids=0 row-size=100B cardinality=unavailable cost=1200000000
+   in pipelines: 00(GETNEXT)
+====
diff --git a/tests/custom_cluster/test_executor_groups.py 
b/tests/custom_cluster/test_executor_groups.py
index 0930ccb76..f67d3988c 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -944,27 +944,51 @@ class TestExecutorGroups(CustomClusterTestSuite):
     nonexistence of 'not_expected_in_profile' in query profile.
     Caller is reponsible to close self.client at the end of test."""
     result = self.execute_query_expect_success(self.client, query)
+    profile = str(result.runtime_profile)
     for expected_profile in expected_strings_in_profile:
-      assert expected_profile in str(result.runtime_profile)
+      assert expected_profile in profile, (
+        "Expect '{0}' IN query profile but can not find it.\n{1}".format(
+          expected_profile, profile
+        )
+      )
     for not_expected in not_expected_in_profile:
-      assert not_expected not in str(result.runtime_profile)
+      assert not_expected not in profile, (
+        "Expect '{0}' NOT IN query profile but found it.\n{1}".format(
+          expected_profile, profile
+        )
+      )
     return result
 
   def __verify_fs_writers(self, result, expected_num_writers,
                           expected_instances_per_host):
     assert 'HDFS WRITER' in result.exec_summary[0]['operator'], 
result.runtime_profile
     num_writers = int(result.exec_summary[0]['num_instances'])
-    assert num_writers == expected_num_writers
+    assert num_writers == expected_num_writers, (
+      "Expect {0} num_writers but got {1}.\n{2}".format(
+        expected_num_writers, num_writers, result.runtime_profile)
+    )
     num_hosts = len(expected_instances_per_host)
-    regex = (r'Per Host Number of Fragment Instances:'
-             + (num_hosts * r'.*?\((.*?)\)') + r'.*?\n')
+    instance_count_key = 'Per Host Number of Fragment Instances:'
+    regex = (instance_count_key + (num_hosts * r'.*?\((.*?)\)') + r'.*?\n')
     matches = re.findall(regex, result.runtime_profile)
-    assert len(matches) == 1
-    assert len(matches[0]) == num_hosts
+    assert len(matches) == 1, (
+      "Expect {0} info string matching '{1}' but got {2}.\n{3}".format(
+        1, instance_count_key, len(matches), result.runtime_profile
+      )
+    )
+    assert len(matches[0]) == num_hosts, (
+      "Expect {0} hosts in '{1}' info string but got {2}.\n{3}".format(
+        num_hosts, instance_count_key, len(matches[0]), result.runtime_profile
+      )
+    )
     num_instances_per_host = [int(i) for i in matches[0]]
     num_instances_per_host.sort()
     expected_instances_per_host.sort()
-    assert num_instances_per_host == expected_instances_per_host
+    assert num_instances_per_host == expected_instances_per_host, (
+      "Expect {0} instance distribution but got {1}.\n{2}".format(
+        expected_instances_per_host, num_instances_per_host, 
result.runtime_profile
+      )
+    )
 
   @UniqueDatabase.parametrize(sync_ddl=True)
   @pytest.mark.execute_serially
@@ -1363,25 +1387,28 @@ class TestExecutorGroups(CustomClusterTestSuite):
       ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
        "Verdict: Match", "CpuAsk: 1", "CpuAskBounded: 1", "|  partitions=6"])
     self.__verify_fs_writers(result, 1, [0, 1])
+    # END testing insert + MAX_FS_WRITER
+
+    self._verify_query_num_for_resource_pool("root.tiny", 1)
+    self._verify_query_num_for_resource_pool("root.small", 2)
+    self._verify_query_num_for_resource_pool("root.large", 1)
+    self._verify_total_admitted_queries("root.tiny", 4)
+    self._verify_total_admitted_queries("root.small", 3)
+    self._verify_total_admitted_queries("root.large", 4)
+
+    # Starting from this point, do not validate request pool assignment and fs 
writers
+    # distribution, because different target file system may come up with 
different
+    # cardinality, cost, and parallelism. Successful query execution is 
sufficient.
 
     # Test partitioned insert overwrite, with unknown partition estimate.
-    result = self._run_query_and_verify_profile(
+    # Cardinality is calculated using byte-based estimation.
+    self._run_query_and_verify_profile(
       ("insert overwrite {0}.{1} ({2}) partition (ss_store_sk) "
        "select {3} from {0}.{4} "
        "where ss_store_sk=1").format(
          unique_database, "test_ctas7", store_sales_no_part_col, 
store_sales_columns,
          "test_ctas4"),
-      ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-       "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9", "|  
partitions=unavailable"])
-    self.__verify_fs_writers(result, 3, [0, 3, 3, 3])
-    # END testing insert + MAX_FS_WRITER
-
-    self._verify_query_num_for_resource_pool("root.tiny", 1)
-    self._verify_query_num_for_resource_pool("root.small", 2)
-    self._verify_query_num_for_resource_pool("root.large", 2)
-    self._verify_total_admitted_queries("root.tiny", 4)
-    self._verify_total_admitted_queries("root.small", 3)
-    self._verify_total_admitted_queries("root.large", 5)
+      ["|  partitions=unavailable"])
 
   @pytest.mark.execute_serially
   def test_query_cpu_count_divisor_two(self):

Reply via email to