This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a5a6f612 IMPALA-14638: Schedule union of iceberg metadata scanner to 
coordinator
3a5a6f612 is described below

commit 3a5a6f612a332fc509cfdc73c4566356a00ac730
Author: Michael Smith <[email protected]>
AuthorDate: Thu Dec 18 14:42:48 2025 -0800

    IMPALA-14638: Schedule union of iceberg metadata scanner to coordinator
    
    On clusters with dedicated coordinators and executors the Iceberg
    metadata scanner fragment(s) must be scheduled to coordinators.
    IMPALA-12809 ensured this for most plans, but if the Iceberg metadata
    scanner is part of a union of unpartitioned fragments a new fragment is
    created for the union that subsumes existing fragments and loses the
    coordinatorOnly flag.
    
    Fixes cases where a multi-fragment plan includes a union of iceberg
    metadata scans by setting coordinatorOnly on the new union fragment.
    Adds new planner and runtime tests for this case.
    
    Change-Id: If2f19945037b4a7a6433cd9c6e7e2b352fae7356
    Reviewed-on: http://gerrit.cloudera.org:8080/23803
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/planner/DistributedPlanner.java  |  5 +-
 ...g-metadata-table-joined-with-regular-table.test | 98 ++++++++++++++++++++++
 tests/custom_cluster/test_coordinators.py          |  8 ++
 3 files changed, 110 insertions(+), 1 deletion(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 1de95c29c..1e97b5f17 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -865,8 +865,11 @@ public class DistributedPlanner {
     // If all child fragments are unpartitioned, return a single unpartitioned 
fragment
     // with a UnionNode that merges all child fragments.
     if (numUnpartitionedChildFragments == childFragments.size()) {
+      // Propagate coordinator-only property to the new union fragment.
+      boolean coordinatorOnlyChild =
+          childFragments.stream().anyMatch(PlanFragment::coordinatorOnly);
       PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(),
-          unionNode, DataPartition.UNPARTITIONED);
+          unionNode, DataPartition.UNPARTITIONED, coordinatorOnlyChild);
       // Absorb the plan trees of all childFragments into unionNode
       // and fix up the fragment tree in the process.
       for (int i = 0; i < childFragments.size(); ++i) {
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-joined-with-regular-table.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-joined-with-regular-table.test
index 771bef40e..743b1afaf 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-joined-with-regular-table.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-joined-with-regular-table.test
@@ -101,3 +101,101 @@ Per-Host Resources: mem-estimate=4.14GB 
mem-reservation=104.01MB thread-reservat
    tuple-ids=2 row-size=1B cardinality=11.96K
    in pipelines: 02(GETNEXT)
 ====
+select count(DISTINCT a.parent_id, a.is_current_ancestor)
+from (select * from functional_parquet.iceberg_query_metadata.history
+      union select * from functional_parquet.iceberg_query_metadata.history) a
+join functional_parquet.alltypestiny c on a.is_current_ancestor = c.bool_col
+---- DISTRIBUTEDPLAN
+F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B 
thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: count(if(a.parent_id IS NULL, NULL, a.is_current_ancestor))
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+12:AGGREGATE [FINALIZE]
+|  output: count:merge(if(a.parent_id IS NULL, NULL, a.is_current_ancestor))
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=6 row-size=8B cardinality=1
+|  in pipelines: 12(GETNEXT), 07(OPEN)
+|
+11:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=6 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT)
+|
+F04:PLAN FRAGMENT [HASH(a.parent_id,a.is_current_ancestor)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=128.03MB mem-reservation=34.00MB 
thread-reservation=1
+07:AGGREGATE
+|  output: count(if(a.parent_id IS NULL, NULL, a.is_current_ancestor))
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=6 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT), 10(OPEN)
+|
+10:AGGREGATE
+|  group by: a.parent_id, a.is_current_ancestor
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=5 row-size=9B cardinality=758
+|  in pipelines: 10(GETNEXT), 04(OPEN)
+|
+09:EXCHANGE [HASH(a.parent_id,a.is_current_ancestor)]
+|  mem-estimate=31.09KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=5 row-size=9B cardinality=758
+|  in pipelines: 04(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=2.15GB mem-reservation=69.01MB 
thread-reservation=2 runtime-filters-memory=1.00MB
+06:AGGREGATE [STREAMING]
+|  group by: parent_id, is_current_ancestor
+|  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=5 row-size=9B cardinality=758
+|  in pipelines: 04(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.bool_col = is_current_ancestor
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- is_current_ancestor
+|  mem-estimate=2.00GB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  tuple-ids=4,2 row-size=34B cardinality=758
+|  in pipelines: 04(GETNEXT), 03(OPEN)
+|
+|--08:EXCHANGE [BROADCAST]
+|  |  mem-estimate=10.04MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2 row-size=33B cardinality=unavailable
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=128.14MB mem-reservation=34.00MB 
thread-reservation=1
+|  03:AGGREGATE [FINALIZE]
+|  |  group by: made_current_at, snapshot_id, parent_id, is_current_ancestor
+|  |  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB 
thread-reservation=0
+|  |  tuple-ids=2 row-size=33B cardinality=unavailable
+|  |  in pipelines: 03(GETNEXT), 01(OPEN), 02(OPEN)
+|  |
+|  00:UNION
+|  |  pass-through-operands: all
+|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2 row-size=33B cardinality=unavailable
+|  |  in pipelines: 01(GETNEXT), 02(GETNEXT)
+|  |
+|  |--02:SCAN ICEBERG METADATA 
[functional_parquet.iceberg_query_metadata.HISTORY]
+|  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |     tuple-ids=1 row-size=33B cardinality=unavailable
+|  |     in pipelines: 02(GETNEXT)
+|  |
+|  01:SCAN ICEBERG METADATA [functional_parquet.iceberg_query_metadata.HISTORY]
+|     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|     tuple-ids=0 row-size=33B cardinality=unavailable
+|     in pipelines: 01(GETNEXT)
+|
+04:SCAN HDFS [functional_parquet.alltypestiny c, RANDOM]
+   HDFS partitions=4/4 files=4 size=11.92KB
+   runtime filters: RF000[bloom] -> c.bool_col
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/4 rows=758
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=1
+   tuple-ids=4 row-size=1B cardinality=758
+   in pipelines: 04(GETNEXT)
+====
diff --git a/tests/custom_cluster/test_coordinators.py 
b/tests/custom_cluster/test_coordinators.py
index 3e0f4d754..edc6ed5c6 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -315,6 +315,14 @@ class TestCoordinators(CustomClusterTestSuite):
         on a.is_current_ancestor = c.bool_col"""
     self.execute_query_expect_success(self.client, q2)
 
+    # A union of metadata tables joined with a regular table.
+    q3 = """select count(DISTINCT a.parent_id, a.is_current_ancestor)
+        from (select * from functional_parquet.iceberg_query_metadata.history
+              union select * from 
functional_parquet.iceberg_query_metadata.history) a
+        join functional_parquet.alltypestiny c
+        on a.is_current_ancestor = c.bool_col"""
+    self.execute_query_expect_success(self.client, q3)
+
   @pytest.mark.execute_serially
   
@CustomClusterTestSuite.with_args(impalad_args="--queue_wait_timeout_ms=2000",
                                     cluster_size=1, 
num_exclusive_coordinators=1)

Reply via email to