This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 8f8afec603e3de82f79be4a597435a93d15ed5c0 Author: Joe McDonnell <[email protected]> AuthorDate: Wed Jun 11 14:36:04 2025 -0700 IMPALA-13659: Add tuple caching support for union nodes This makes UnionNodes eligible for tuple caching. This currently disables caching immediately above a union node, because the individual pieces of a union can cache individually. However, aggregations on top of unions are now eligible for caching and this impacts some queries in TPC-DS. Testing: - Added test cases to TupleCacheTest - Ran test_tuple_cache_tpc_queries.py Change-Id: I1dd51858a19fd1447bb7d0e477a02b57d3f4de41 Reviewed-on: http://gerrit.cloudera.org:8080/23011 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Yida Wu <[email protected]> Reviewed-by: Michael Smith <[email protected]> --- .../java/org/apache/impala/planner/UnionNode.java | 16 ++++++++-- .../org/apache/impala/planner/TupleCacheTest.java | 36 ++++++++++++++++++++-- 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java index c25570dec..0f2a0d425 100644 --- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java +++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java @@ -27,6 +27,7 @@ import org.apache.impala.analysis.SlotRef; import org.apache.impala.analysis.SortInfo; import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.analysis.TupleId; +import org.apache.impala.common.ThriftSerializationCtx; import org.apache.impala.thrift.TExecNodePhase; import org.apache.impala.thrift.TExplainLevel; import org.apache.impala.thrift.TExpr; @@ -351,14 +352,19 @@ public class UnionNode extends PlanNode { @Override protected void toThrift(TPlanNode msg) { + Preconditions.checkState(false, "Unexpected use of old toThrift() signature."); + } + + @Override + protected void toThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) { Preconditions.checkState(materializedResultExprLists_.size() == children_.size()); List<List<TExpr>> texprLists = new ArrayList<>(); for (List<Expr> exprList: materializedResultExprLists_) { - texprLists.add(Expr.treesToThrift(exprList)); + texprLists.add(Expr.treesToThrift(exprList, serialCtx)); } List<List<TExpr>> constTexprLists = new ArrayList<>(); for (List<Expr> constTexprList: materializedConstExprLists_) { - constTexprLists.add(Expr.treesToThrift(constTexprList)); + constTexprLists.add(Expr.treesToThrift(constTexprList, serialCtx)); } Preconditions.checkState(firstMaterializedChildIdx_ <= children_.size()); msg.union_node = new TUnionNode( @@ -412,4 +418,10 @@ public class UnionNode extends PlanNode { } } } + + @Override + public boolean isTupleCachingImplemented() { return true; } + + @Override + public boolean omitTupleCache() { return true; } } diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java index 3abffe2b9..507b3dbb4 100644 --- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java +++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java @@ -169,11 +169,15 @@ public class TupleCacheTest extends PlannerTestBase { /* isDistributedPlan */ true); // For single node plan, there are no exchanges and everything is eligible. verifyAllEligible(innerJoinAgg, /* isDistributedPlan */ false); - // Both scans are cached, but aggregate is not because it's above a union. Limit only - // applies to aggregate above exchange, which is obviously not cached. + // The scans and union are eligible. The aggregate is not because in a single + // node plan, the limit attaches to the aggregate. More sophisticated logic could + // detect that the limit is not binding as count(*) returns only a single row. String unionAgg = "select count(*) from (select * from functional.alltypes " + "union all select * from functional.alltypessmall) t limit 10"; - verifyNIdenticalCacheKeys(unionAgg, unionAgg, 2); + verifyNIdenticalCacheKeys(unionAgg, unionAgg, 3, /* isDistributedPlan */ false); + // With a distributed plan, the limit attaches to the aggregate finalize, so the + // initial aggregate is eligible in addition to the union and two scans. + verifyNIdenticalCacheKeys(unionAgg, unionAgg, 4, /* isDistributedPlan */ true); // Only scan is cached, as aggregates are above an exchange and TOP-N. String groupConcatGroupAgg = "select day, group_concat(distinct string_col) " + "from (select * from functional.alltypesagg where id % 100 = day order by id " + @@ -184,6 +188,25 @@ public class TupleCacheTest extends PlannerTestBase { verifyNIdenticalCacheKeys(appxMedianAgg, appxMedianAgg, 1); } + @Test + public void testUnions() { + String select_alltypes_id = "select id from functional.alltypes"; + String select_alltypessmall_id = "select id from functional.alltypessmall"; + verifyAllEligible(select_alltypes_id + " union all " + select_alltypessmall_id, + /*isDistributedPlan*/ false); + verifyAllEligible("select count(*) from (select * from functional.alltypes " + + "union all select * from functional.alltypessmall) t", + /*isDistributedPlan*/ false); + // A union distinct has an aggregate above the union to deduplicate the results. + // The whole thing is supported now. + verifyAllEligible(select_alltypes_id + " union distinct " + select_alltypessmall_id, + /*isDistributedPlan*/ false); + // The order of the union matters to the key above the union. + verifyOverlappingCacheKeys( + select_alltypes_id + " union all " + select_alltypessmall_id, + select_alltypessmall_id + " union all " + select_alltypes_id); + } + /** * Test cases that rely on masking out unnecessary data to have cache hits. */ @@ -330,6 +353,13 @@ public class TupleCacheTest extends PlannerTestBase { "select straight_join probe.id from functional.alltypes probe, (select " + "build1.id from functional.alltypes build1) build where probe.id = build.id " + "and probe.bool_col = true and probe.int_col = 100"); + + // This has a union on the build side. This adds an extra filter on the probe side and + // changes the slot ids, but that does not change the cache keys for the build-side. + String join_union_build = "select straight_join u.id from functional.alltypes t, " + + "(select id from functional.alltypessmall union all " + + "select id from functional.alltypestiny) u where t.id = u.id"; + verifyOverlappingCacheKeys(join_union_build, join_union_build + " and t.int_col = 1"); } @Test
