This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 3a5a6f612 IMPALA-14638: Schedule union of iceberg metadata scanner to
coordinator
3a5a6f612 is described below
commit 3a5a6f612a332fc509cfdc73c4566356a00ac730
Author: Michael Smith <[email protected]>
AuthorDate: Thu Dec 18 14:42:48 2025 -0800
IMPALA-14638: Schedule union of iceberg metadata scanner to coordinator
On clusters with dedicated coordinators and executors the Iceberg
metadata scanner fragment(s) must be scheduled to coordinators.
IMPALA-12809 ensured this for most plans, but if the Iceberg metadata
scanner is part of a union of unpartitioned fragments a new fragment is
created for the union that subsumes existing fragments and loses the
coordinatorOnly flag.
Fixes cases where a multi-fragment plan includes a union of iceberg
metadata scans by setting coordinatorOnly on the new union fragment.
Adds new planner and runtime tests for this case.
Change-Id: If2f19945037b4a7a6433cd9c6e7e2b352fae7356
Reviewed-on: http://gerrit.cloudera.org:8080/23803
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../apache/impala/planner/DistributedPlanner.java | 5 +-
...g-metadata-table-joined-with-regular-table.test | 98 ++++++++++++++++++++++
tests/custom_cluster/test_coordinators.py | 8 ++
3 files changed, 110 insertions(+), 1 deletion(-)
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 1de95c29c..1e97b5f17 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -865,8 +865,11 @@ public class DistributedPlanner {
// If all child fragments are unpartitioned, return a single unpartitioned
fragment
// with a UnionNode that merges all child fragments.
if (numUnpartitionedChildFragments == childFragments.size()) {
+ // Propagate coordinator-only property to the new union fragment.
+ boolean coordinatorOnlyChild =
+ childFragments.stream().anyMatch(PlanFragment::coordinatorOnly);
PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(),
- unionNode, DataPartition.UNPARTITIONED);
+ unionNode, DataPartition.UNPARTITIONED, coordinatorOnlyChild);
// Absorb the plan trees of all childFragments into unionNode
// and fix up the fragment tree in the process.
for (int i = 0; i < childFragments.size(); ++i) {
diff --git
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-joined-with-regular-table.test
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-joined-with-regular-table.test
index 771bef40e..743b1afaf 100644
---
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-joined-with-regular-table.test
+++
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-joined-with-regular-table.test
@@ -101,3 +101,101 @@ Per-Host Resources: mem-estimate=4.14GB
mem-reservation=104.01MB thread-reservat
tuple-ids=2 row-size=1B cardinality=11.96K
in pipelines: 02(GETNEXT)
====
+select count(DISTINCT a.parent_id, a.is_current_ancestor)
+from (select * from functional_parquet.iceberg_query_metadata.history
+ union select * from functional_parquet.iceberg_query_metadata.history) a
+join functional_parquet.alltypestiny c on a.is_current_ancestor = c.bool_col
+---- DISTRIBUTEDPLAN
+F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B
thread-reservation=1
+PLAN-ROOT SINK
+| output exprs: count(if(a.parent_id IS NULL, NULL, a.is_current_ancestor))
+| mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+12:AGGREGATE [FINALIZE]
+| output: count:merge(if(a.parent_id IS NULL, NULL, a.is_current_ancestor))
+| mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB
thread-reservation=0
+| tuple-ids=6 row-size=8B cardinality=1
+| in pipelines: 12(GETNEXT), 07(OPEN)
+|
+11:EXCHANGE [UNPARTITIONED]
+| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| tuple-ids=6 row-size=8B cardinality=1
+| in pipelines: 07(GETNEXT)
+|
+F04:PLAN FRAGMENT [HASH(a.parent_id,a.is_current_ancestor)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=128.03MB mem-reservation=34.00MB
thread-reservation=1
+07:AGGREGATE
+| output: count(if(a.parent_id IS NULL, NULL, a.is_current_ancestor))
+| mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB
thread-reservation=0
+| tuple-ids=6 row-size=8B cardinality=1
+| in pipelines: 07(GETNEXT), 10(OPEN)
+|
+10:AGGREGATE
+| group by: a.parent_id, a.is_current_ancestor
+| mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB
thread-reservation=0
+| tuple-ids=5 row-size=9B cardinality=758
+| in pipelines: 10(GETNEXT), 04(OPEN)
+|
+09:EXCHANGE [HASH(a.parent_id,a.is_current_ancestor)]
+| mem-estimate=31.09KB mem-reservation=0B thread-reservation=0
+| tuple-ids=5 row-size=9B cardinality=758
+| in pipelines: 04(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=2.15GB mem-reservation=69.01MB
thread-reservation=2 runtime-filters-memory=1.00MB
+06:AGGREGATE [STREAMING]
+| group by: parent_id, is_current_ancestor
+| mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB
thread-reservation=0
+| tuple-ids=5 row-size=9B cardinality=758
+| in pipelines: 04(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+| hash predicates: c.bool_col = is_current_ancestor
+| fk/pk conjuncts: assumed fk/pk
+| runtime filters: RF000[bloom] <- is_current_ancestor
+| mem-estimate=2.00GB mem-reservation=34.00MB spill-buffer=2.00MB
thread-reservation=0
+| tuple-ids=4,2 row-size=34B cardinality=758
+| in pipelines: 04(GETNEXT), 03(OPEN)
+|
+|--08:EXCHANGE [BROADCAST]
+| | mem-estimate=10.04MB mem-reservation=0B thread-reservation=0
+| | tuple-ids=2 row-size=33B cardinality=unavailable
+| | in pipelines: 03(GETNEXT)
+| |
+| F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| Per-Host Resources: mem-estimate=128.14MB mem-reservation=34.00MB
thread-reservation=1
+| 03:AGGREGATE [FINALIZE]
+| | group by: made_current_at, snapshot_id, parent_id, is_current_ancestor
+| | mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB
thread-reservation=0
+| | tuple-ids=2 row-size=33B cardinality=unavailable
+| | in pipelines: 03(GETNEXT), 01(OPEN), 02(OPEN)
+| |
+| 00:UNION
+| | pass-through-operands: all
+| | mem-estimate=0B mem-reservation=0B thread-reservation=0
+| | tuple-ids=2 row-size=33B cardinality=unavailable
+| | in pipelines: 01(GETNEXT), 02(GETNEXT)
+| |
+| |--02:SCAN ICEBERG METADATA
[functional_parquet.iceberg_query_metadata.HISTORY]
+| | mem-estimate=0B mem-reservation=0B thread-reservation=0
+| | tuple-ids=1 row-size=33B cardinality=unavailable
+| | in pipelines: 02(GETNEXT)
+| |
+| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_query_metadata.HISTORY]
+| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| tuple-ids=0 row-size=33B cardinality=unavailable
+| in pipelines: 01(GETNEXT)
+|
+04:SCAN HDFS [functional_parquet.alltypestiny c, RANDOM]
+ HDFS partitions=4/4 files=4 size=11.92KB
+ runtime filters: RF000[bloom] -> c.bool_col
+ stored statistics:
+ table: rows=unavailable size=unavailable
+ partitions: 0/4 rows=758
+ columns: unavailable
+ extrapolated-rows=disabled max-scan-range-rows=unavailable
+ mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=1
+ tuple-ids=4 row-size=1B cardinality=758
+ in pipelines: 04(GETNEXT)
+====
diff --git a/tests/custom_cluster/test_coordinators.py
b/tests/custom_cluster/test_coordinators.py
index 3e0f4d754..edc6ed5c6 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -315,6 +315,14 @@ class TestCoordinators(CustomClusterTestSuite):
on a.is_current_ancestor = c.bool_col"""
self.execute_query_expect_success(self.client, q2)
+ # A union of metadata tables joined with a regular table.
+ q3 = """select count(DISTINCT a.parent_id, a.is_current_ancestor)
+ from (select * from functional_parquet.iceberg_query_metadata.history
+ union select * from
functional_parquet.iceberg_query_metadata.history) a
+ join functional_parquet.alltypestiny c
+ on a.is_current_ancestor = c.bool_col"""
+ self.execute_query_expect_success(self.client, q3)
+
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args="--queue_wait_timeout_ms=2000",
cluster_size=1,
num_exclusive_coordinators=1)