This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8f8afec603e3de82f79be4a597435a93d15ed5c0
Author: Joe McDonnell <[email protected]>
AuthorDate: Wed Jun 11 14:36:04 2025 -0700

    IMPALA-13659: Add tuple caching support for union nodes
    
    This makes UnionNodes eligible for tuple caching. This currently
    disables caching immediately above a union node, because the individual
    pieces of a union can cache individually. However, aggregations on
    top of unions are now eligible for caching and this impacts some queries
    in TPC-DS.
    
    Testing:
     - Added test cases to TupleCacheTest
     - Ran test_tuple_cache_tpc_queries.py
    
    Change-Id: I1dd51858a19fd1447bb7d0e477a02b57d3f4de41
    Reviewed-on: http://gerrit.cloudera.org:8080/23011
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Yida Wu <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
---
 .../java/org/apache/impala/planner/UnionNode.java  | 16 ++++++++--
 .../org/apache/impala/planner/TupleCacheTest.java  | 36 ++++++++++++++++++++--
 2 files changed, 47 insertions(+), 5 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java 
b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index c25570dec..0f2a0d425 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -27,6 +27,7 @@ import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
+import org.apache.impala.common.ThriftSerializationCtx;
 import org.apache.impala.thrift.TExecNodePhase;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
@@ -351,14 +352,19 @@ public class UnionNode extends PlanNode {
 
   @Override
   protected void toThrift(TPlanNode msg) {
+    Preconditions.checkState(false, "Unexpected use of old toThrift() 
signature.");
+  }
+
+  @Override
+  protected void toThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) {
     Preconditions.checkState(materializedResultExprLists_.size() == 
children_.size());
     List<List<TExpr>> texprLists = new ArrayList<>();
     for (List<Expr> exprList: materializedResultExprLists_) {
-      texprLists.add(Expr.treesToThrift(exprList));
+      texprLists.add(Expr.treesToThrift(exprList, serialCtx));
     }
     List<List<TExpr>> constTexprLists = new ArrayList<>();
     for (List<Expr> constTexprList: materializedConstExprLists_) {
-      constTexprLists.add(Expr.treesToThrift(constTexprList));
+      constTexprLists.add(Expr.treesToThrift(constTexprList, serialCtx));
     }
     Preconditions.checkState(firstMaterializedChildIdx_ <= children_.size());
     msg.union_node = new TUnionNode(
@@ -412,4 +418,10 @@ public class UnionNode extends PlanNode {
       }
     }
   }
+
+  @Override
+  public boolean isTupleCachingImplemented() { return true; }
+
+  @Override
+  public boolean omitTupleCache() { return true; }
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java 
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
index 3abffe2b9..507b3dbb4 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -169,11 +169,15 @@ public class TupleCacheTest extends PlannerTestBase {
         /* isDistributedPlan */ true);
     // For single node plan, there are no exchanges and everything is eligible.
     verifyAllEligible(innerJoinAgg, /* isDistributedPlan */ false);
-    // Both scans are cached, but aggregate is not because it's above a union. 
Limit only
-    // applies to aggregate above exchange, which is obviously not cached.
+    // The scans and union are eligible. The aggregate is not because in a 
single
+    // node plan, the limit attaches to the aggregate. More sophisticated 
logic could
+    // detect that the limit is not binding as count(*) returns only a single 
row.
     String unionAgg = "select count(*) from (select * from functional.alltypes 
" +
         "union all select * from functional.alltypessmall) t limit 10";
-    verifyNIdenticalCacheKeys(unionAgg, unionAgg, 2);
+    verifyNIdenticalCacheKeys(unionAgg, unionAgg, 3, /* isDistributedPlan */ 
false);
+    // With a distributed plan, the limit attaches to the aggregate finalize, 
so the
+    // initial aggregate is eligible in addition to the union and two scans.
+    verifyNIdenticalCacheKeys(unionAgg, unionAgg, 4, /* isDistributedPlan */ 
true);
     // Only scan is cached, as aggregates are above an exchange and TOP-N.
     String groupConcatGroupAgg = "select day, group_concat(distinct 
string_col) " +
         "from (select * from functional.alltypesagg where id % 100 = day order 
by id " +
@@ -184,6 +188,25 @@ public class TupleCacheTest extends PlannerTestBase {
     verifyNIdenticalCacheKeys(appxMedianAgg, appxMedianAgg, 1);
   }
 
+  @Test
+  public void testUnions() {
+    String select_alltypes_id = "select id from functional.alltypes";
+    String select_alltypessmall_id = "select id from functional.alltypessmall";
+    verifyAllEligible(select_alltypes_id + " union all " + 
select_alltypessmall_id,
+        /*isDistributedPlan*/ false);
+    verifyAllEligible("select count(*) from (select * from functional.alltypes 
" +
+        "union all select * from functional.alltypessmall) t",
+        /*isDistributedPlan*/ false);
+    // A union distinct has an aggregate above the union to deduplicate the 
results.
+    // The whole thing is supported now.
+    verifyAllEligible(select_alltypes_id + " union distinct " + 
select_alltypessmall_id,
+        /*isDistributedPlan*/ false);
+    // The order of the union matters to the key above the union.
+    verifyOverlappingCacheKeys(
+        select_alltypes_id + " union all " + select_alltypessmall_id,
+        select_alltypessmall_id + " union all " + select_alltypes_id);
+  }
+
   /**
    * Test cases that rely on masking out unnecessary data to have cache hits.
    */
@@ -330,6 +353,13 @@ public class TupleCacheTest extends PlannerTestBase {
         "select straight_join probe.id from functional.alltypes probe, (select 
" +
         "build1.id from functional.alltypes build1) build where probe.id = 
build.id " +
         "and probe.bool_col = true and probe.int_col = 100");
+
+    // This has a union on the build side. This adds an extra filter on the 
probe side and
+    // changes the slot ids, but that does not change the cache keys for the 
build-side.
+    String join_union_build = "select straight_join u.id from 
functional.alltypes t, " +
+      "(select id from functional.alltypessmall union all " +
+      "select id from functional.alltypestiny) u where t.id = u.id";
+    verifyOverlappingCacheKeys(join_union_build, join_union_build + " and 
t.int_col = 1");
   }
 
   @Test

Reply via email to