This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new d81c4db5e IMPALA-13185: Include runtime filter source in key
d81c4db5e is described below

commit d81c4db5e1e99865cf3a9a96cbc48af6436dc0c0
Author: Michael Smith <[email protected]>
AuthorDate: Tue Aug 27 11:32:40 2024 -0700

    IMPALA-13185: Include runtime filter source in key
    
    Incorporates the build-side PlanNode of a runtime filter in the tuple
    cache key to avoid re-using intermediate results that were generated
    using a runtime filter on the same target but different selection
    criteria (build-side conjuncts).
    
    We currently don't support caching ExchangeNode, but a common scenario
    is a runtime filter produced by a HashJoin, with an Exchange on the
    build side. Looks through the first ExchangeNode when considering the
    cache key and eligibility for the build side source for a runtime
    filter.
    
    Testing shows all tests now passing for test_tuple_cache_tpc_queries
    except those that hit "TupleCacheNode does not enforce limits itself and
    cannot have a limit set."
    
    Adds planner tests covering some scenarios where runtime filters are
    expected to match or differ, and custom cluster tests for multi-node
    testing.
    
    Change-Id: I0077964be5acdb588d76251a6a39e57a0f42bb5a
    Reviewed-on: http://gerrit.cloudera.org:8080/21729
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Joe McDonnell <[email protected]>
---
 .../org/apache/impala/planner/JoinBuildSink.java   |   4 +-
 .../java/org/apache/impala/planner/JoinNode.java   |   5 +
 .../java/org/apache/impala/planner/PlanNode.java   |  42 ++++-
 .../impala/planner/RuntimeFilterGenerator.java     |  60 +++++--
 .../org/apache/impala/planner/TupleCacheInfo.java  |  76 ++++++++-
 .../org/apache/impala/planner/TupleCacheTest.java  |  64 ++++++++
 tests/custom_cluster/test_tuple_cache.py           | 177 ++++++++++++++++++++-
 7 files changed, 395 insertions(+), 33 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java 
b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
index 8f6698119..2b13c1879 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.common.ThriftSerializationCtx;
 import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
@@ -90,7 +91,8 @@ public class JoinBuildSink extends DataSink {
       tBuildSink.setHash_seed(joinNode_.getFragment().getHashSeed());
     }
     for (RuntimeFilter filter : runtimeFilters_) {
-      tBuildSink.addToRuntime_filters(filter.toThrift());
+      tBuildSink.addToRuntime_filters(
+          filter.toThrift(new ThriftSerializationCtx(), null));
     }
     tBuildSink.setShare_build(joinNode_.canShareBuild());
     tsink.setJoin_build_sink(tBuildSink);
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index 43dc651f2..38831a42c 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -1016,6 +1016,11 @@ public abstract class JoinNode extends PlanNode {
     }
   }
 
+  public PlanNode getBuildNode() {
+    Preconditions.checkState(getChildCount() == 2);
+    return getChild(1);
+  }
+
   /**
    * Helper method to compute the resource requirements for the join that can 
be
    * called from the builder or the join node. Returns a pair of the probe
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java 
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 16c253374..b28b9d085 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -546,7 +546,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     }
     // Serialize any runtime filters
     for (RuntimeFilter filter : runtimeFilters_) {
-      msg.addToRuntime_filters(filter.toThrift());
+      msg.addToRuntime_filters(filter.toThrift(serialCtx, this));
     }
     msg.setDisable_codegen(disableCodegen_);
     msg.pipelines = new ArrayList<>();
@@ -1324,15 +1324,25 @@ abstract public class PlanNode extends 
TreeNode<PlanNode> {
    * the key if the node is eligible to avoid overhead.
    */
   public void computeTupleCacheInfo(DescriptorTable descTbl) {
+    if (tupleCacheInfo_ != null) {
+      // Already computed.
+      LOG.trace("Tuple cache found for {}", this);
+      return;
+    }
+
+    LOG.trace("Computing tuple cache info for {}", this);
     tupleCacheInfo_ = new TupleCacheInfo(descTbl);
     // computing the tuple cache information is a bottom-up tree traversal,
     // so visit and merge the children before processing this node's contents
-    for (int i = 0; i < getChildCount(); i++) {
-      getChild(i).computeTupleCacheInfo(descTbl);
-      tupleCacheInfo_.mergeChild(getChild(i).getTupleCacheInfo());
+    for (PlanNode child : getChildren()) {
+      child.computeTupleCacheInfo(descTbl);
+      if (!tupleCacheInfo_.mergeChild(child.getTupleCacheInfo())) {
+        LOG.trace("{} ineligible for caching due to {}", this, child);
+      }
     }
 
     if (!isTupleCachingImplemented()) {
+      LOG.trace("{} is ineligible for caching", this);
       
tupleCacheInfo_.setIneligible(TupleCacheInfo.IneligibilityReason.NOT_IMPLEMENTED);
     }
 
@@ -1342,6 +1352,29 @@ abstract public class PlanNode extends 
TreeNode<PlanNode> {
       return;
     }
 
+    // Include the build-side of a RuntimeFilter; look past the 1st 
ExchangeNode.
+    // If the build-side is hashable, merge the hash. Otherwise mark this node 
as
+    // ineligible because the RuntimeFilter is too complex to reason about.
+    for (RuntimeFilter filter : runtimeFilters_) {
+      // We want the build side of the join.
+      PlanNode build = filter.getSrc().getBuildNode();
+      Preconditions.checkState(!build.contains(this),
+          "Build-side contains current node, so cache info cannot be 
initialized");
+
+      if (build instanceof ExchangeNode && build.getChildCount() == 1) {
+        // We only look past ExchangeNodes with 1 child. IcebergDeleteNode has 
2.
+        build = build.getChild(0);
+      }
+
+      // Build may not have been visited yet.
+      build.computeTupleCacheInfo(descTbl);
+      if (!tupleCacheInfo_.mergeChildWithScans(build.getTupleCacheInfo())) {
+        LOG.trace("{} on {} ineligible for caching due to {}", filter, this, 
build);
+        tupleCacheInfo_.finalizeHash();
+        return;
+      }
+    }
+
     // Incorporate this node's information
     // TODO: This will also calculate eligibility via initThrift/toThrift.
     // TODO: This will adjust the output of initThrift/toThrift to mask out 
items.
@@ -1351,6 +1384,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> 
{
     toThrift(msg, serialCtx);
     tupleCacheInfo_.hashThrift(msg);
     tupleCacheInfo_.finalizeHash();
+    LOG.trace("Hash for {}: {}", this, tupleCacheInfo_.getHashTrace());
   }
 
   /**
diff --git 
a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java 
b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index d05fe9399..02a70dea4 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -58,6 +58,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.IdGenerator;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.common.ThriftSerializationCtx;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.planner.JoinNode.EqJoinConjunctScanSlots;
 import org.apache.impala.service.BackendConfig;
@@ -285,14 +286,19 @@ public final class RuntimeFilterGenerator {
         this.highValue = highValue;
       }
 
-      public TRuntimeFilterTargetDesc toThrift() {
+      public TRuntimeFilterTargetDesc toThrift(ThriftSerializationCtx 
serialCtx) {
         TRuntimeFilterTargetDesc tFilterTarget = new 
TRuntimeFilterTargetDesc();
-        tFilterTarget.setNode_id(node.getId().asInt());
-        tFilterTarget.setTarget_expr(expr.treeToThrift());
+        if (serialCtx.isTupleCache()) {
+          // Target PlanNodeId is global and not useful for tuple caching.
+          tFilterTarget.setNode_id(0);
+        } else {
+          tFilterTarget.setNode_id(node.getId().asInt());
+        }
+        tFilterTarget.setTarget_expr(expr.treeToThrift(serialCtx));
         List<SlotId> sids = new ArrayList<>();
         expr.getIds(null, sids);
         List<Integer> tSlotIds = Lists.newArrayListWithCapacity(sids.size());
-        for (SlotId sid: sids) tSlotIds.add(sid.asInt());
+        for (SlotId sid: sids) 
tSlotIds.add(serialCtx.translateSlotId(sid).asInt());
         tFilterTarget.setTarget_expr_slotids(tSlotIds);
         
tFilterTarget.setIs_bound_by_partition_columns(isBoundByPartitionColumns);
         tFilterTarget.setIs_local_target(isLocalTarget);
@@ -373,23 +379,45 @@ public final class RuntimeFilterGenerator {
     /**
      * Serializes a runtime filter to Thrift.
      */
-    public TRuntimeFilterDesc toThrift() {
+    public TRuntimeFilterDesc toThrift(ThriftSerializationCtx serialCtx,
+        PlanNode cacheTarget) {
       TRuntimeFilterDesc tFilter = new TRuntimeFilterDesc();
-      tFilter.setFilter_id(id_.asInt());
-      tFilter.setSrc_expr(srcExpr_.treeToThrift());
-      tFilter.setSrc_node_id(src_.getId().asInt());
+      // Omit properties that don't affect tuple caching.
+      if (serialCtx.isTupleCache()) {
+        // Required property; RuntimeFilterId is irrelevant to tuple cache 
results.
+        tFilter.setFilter_id(0);
+        // The target plan is already serialized as part of the calling 
context. targets_
+        // may reference multiple target nodes; the others are irrelevent for 
tuple
+        // caching.
+        tFilter.setTargets(new ArrayList<>());
+        // Target PlanNodeId is global and not useful for tuple caching.
+        tFilter.setPlanid_to_target_ndx(new HashMap<>());
+        // Incorporate the source plan TRuntimeFilterTargetDesc. targets_ may 
include
+        // other targets that are not relevant to the PlanNode we're caching.
+        for (RuntimeFilterTarget target: targets_) {
+          if (target.node == cacheTarget) {
+            tFilter.addToTargets(target.toThrift(serialCtx));
+            break;
+          }
+        }
+      } else {
+        tFilter.setFilter_id(id_.asInt());
+        tFilter.setSrc_node_id(src_.getId().asInt());
+        tFilter.setNdv_estimate(ndvEstimate_);
+        for (int i = 0; i < targets_.size(); ++i) {
+          RuntimeFilterTarget target = targets_.get(i);
+          tFilter.addToTargets(target.toThrift(serialCtx));
+          tFilter.putToPlanid_to_target_ndx(target.node.getId().asInt(), i);
+        }
+      }
+      tFilter.setSrc_expr(srcExpr_.treeToThrift(serialCtx));
       tFilter.setIs_broadcast_join(isBroadcastJoin_);
-      tFilter.setNdv_estimate(ndvEstimate_);
       tFilter.setHas_local_targets(hasLocalTargets_);
       tFilter.setHas_remote_targets(hasRemoteTargets_);
       tFilter.setCompareOp(exprCmpOp_.getThriftOp());
       boolean appliedOnPartitionColumns = true;
-      for (int i = 0; i < targets_.size(); ++i) {
-        RuntimeFilterTarget target = targets_.get(i);
-        tFilter.addToTargets(target.toThrift());
-        tFilter.putToPlanid_to_target_ndx(target.node.getId().asInt(), i);
-        appliedOnPartitionColumns =
-            appliedOnPartitionColumns && target.isBoundByPartitionColumns;
+      for (RuntimeFilterTarget target: targets_) {
+        appliedOnPartitionColumns &= target.isBoundByPartitionColumns;
       }
       tFilter.setApplied_on_partition_columns(appliedOnPartitionColumns);
       tFilter.setType(type_);
@@ -664,7 +692,7 @@ public final class RuntimeFilterGenerator {
     public long getFilterSize() { return filterSizeBytes_; }
     public boolean isTimestampTruncation() { return isTimestampTruncation_; }
     public boolean isBroadcast() { return isBroadcastJoin_; }
-    public PlanNode getSrc() { return src_; }
+    public JoinNode getSrc() { return src_; }
 
     private long getBuildKeyNumRowStats() {
       long minNumRows = src_.getChild(1).getCardinality();
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
index fa065fa29..5c8af2b1d 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
@@ -33,6 +33,10 @@ import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
 import org.apache.impala.common.IdGenerator;
 import org.apache.impala.common.ThriftSerializationCtx;
+import org.apache.impala.thrift.TFileSplitGeneratorSpec;
+import org.apache.impala.thrift.TScanRange;
+import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TSlotDescriptor;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TTupleDescriptor;
@@ -176,13 +180,77 @@ public class TupleCacheInfo {
    * Pull in a child's TupleCacheInfo into this TupleCacheInfo. If the child is
    * ineligible, then this is marked ineligible and there is no need to 
calculate
    * a hash. If the child is eligible, it incorporates the child's hash into 
this
-   * hash.
+   * hash. Returns true if the child was merged, false if it was ineligible.
    */
-  public void mergeChild(TupleCacheInfo child) {
+  public boolean mergeChild(TupleCacheInfo child) {
+    if (!mergeChildImpl(child)) {
+      return false;
+    }
+
+    // Merge the child's inputScanNodes_
+    inputScanNodes_.addAll(child.inputScanNodes_);
+    return true;
+  }
+
+  /**
+   * Pull in a child's TupleCacheInfo into this TupleCacheInfo. If the child is
+   * ineligible, then this is marked ineligible and there is no need to 
calculate
+   * a hash. If the child is eligible, it incorporates the child's hash into 
this
+   * hash. Returns true if the child was merged, false if it was ineligible.
+   *
+   * Resolves scan ranges statically for cases where results depend on all 
scan ranges.
+   */
+  public boolean mergeChildWithScans(TupleCacheInfo child) {
+    if (!mergeChildImpl(child)) {
+      return false;
+    }
+
+    // Add all scan range specs to the hash. Copy only the relevant fields, 
primarily:
+    // filename, mtime, size, and offset. Others like partition_id may change 
after
+    // reloading metadata.
+    for (HdfsScanNode scanNode: child.inputScanNodes_) {
+      TScanRangeSpec orig = scanNode.getScanRangeSpecs();
+      TScanRangeSpec spec = new TScanRangeSpec();
+      if (orig.isSetConcrete_ranges()) {
+        for (TScanRangeLocationList origLocList: orig.concrete_ranges) {
+          // We only need the TScanRange, which provides the file segment info.
+          TScanRangeLocationList locList = new TScanRangeLocationList();
+          TScanRange scanRange = origLocList.scan_range.deepCopy();
+          if (scanRange.isSetHdfs_file_split()) {
+            // Zero out partition_id, it's not stable.
+            scanRange.hdfs_file_split.partition_id = 0;
+          }
+          locList.setScan_range(scanRange);
+          spec.addToConcrete_ranges(locList);
+        }
+        // Reloaded partitions may have a different order. Sort for stability.
+        spec.concrete_ranges.sort(null);
+      }
+      if (orig.isSetSplit_specs()) {
+        for (TFileSplitGeneratorSpec origSplitSpec: orig.split_specs) {
+          TFileSplitGeneratorSpec splitSpec = origSplitSpec.deepCopy();
+          // Zero out partition_id, it's not stable.
+          splitSpec.partition_id = 0;
+          spec.addToSplit_specs(splitSpec);
+        }
+        // Reloaded partitions may have a different order. Sort for stability.
+        spec.split_specs.sort(null);
+      }
+      hashThrift(spec);
+    }
+    return true;
+  }
+
+  /**
+   * Pull in a child's TupleCacheInfo that can be exhaustively determined 
during planning.
+   * Public interfaces may add additional info that is more dynamic, such as 
scan ranges.
+   */
+  private boolean mergeChildImpl(TupleCacheInfo child) {
     Preconditions.checkState(!finalized_,
         "TupleCacheInfo is finalized and can't be modified");
     if (!child.isEligible()) {
       ineligibilityReasons_.add(IneligibilityReason.CHILDREN_INELIGIBLE);
+      return false;
     } else {
       // The child is eligible, so incorporate its hash into our hasher.
       hasher_.putBytes(child.getHashString().getBytes());
@@ -192,9 +260,6 @@ public class TupleCacheInfo {
       // and each contribution would be clear.
       hashTraceBuilder_.append(child.getHashTrace());
 
-      // Merge the child's inputScanNodes_
-      inputScanNodes_.addAll(child.inputScanNodes_);
-
       // Incorporate the child's tuple references. This is creating a new 
translation
       // of TupleIds, because it will be incorporating multiple children.
       for (TupleId id : child.tupleTranslationMap_.keySet()) {
@@ -203,6 +268,7 @@ public class TupleCacheInfo {
         // id translation maps.
         registerTupleHelper(id, false);
       }
+      return true;
     }
   }
 
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java 
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
index 1bce808b6..c43bb18a9 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -65,6 +65,47 @@ public class TupleCacheTest extends PlannerTestBase {
         "select id from functional.alltypes where id = 2");
   }
 
+  @Test
+  public void testRuntimeFilterCacheKeys() {
+    String basicJoinTmpl = "select straight_join probe.id from 
functional.alltypes " +
+        "probe, functional.alltypestiny build where %s";
+    verifyIdenticalCacheKeys(
+        String.format(basicJoinTmpl, "probe.id = build.id"),
+        String.format(basicJoinTmpl, "probe.id = build.id"));
+    // Trivial rewrite produces same plan
+    verifyIdenticalCacheKeys(
+        String.format(basicJoinTmpl, "probe.id = build.id"),
+        String.format(basicJoinTmpl, "build.id = probe.id"));
+    // Larger join with same subquery. Cache keys match because cache is 
disabled when
+    // the build side is too complex.
+    verifyIdenticalCacheKeys(
+        String.format(basicJoinTmpl, "probe.id = build.id"),
+        "select straight_join p.id from functional.alltypes p, (" +
+        String.format(basicJoinTmpl, "probe.id = build.id") + ") b where p.id 
= b.id");
+    // Different filter slot
+    verifyOverlappingCacheKeys(
+        String.format(basicJoinTmpl, "probe.id = build.id"),
+        String.format(basicJoinTmpl, "probe.id + 1 = build.id"));
+    // Different target expression
+    verifyOverlappingCacheKeys(
+        String.format(basicJoinTmpl, "probe.id + 1 = build.id"),
+        String.format(basicJoinTmpl, "probe.id + 2 = build.id"));
+    // Larger join with similar subquery and simpler plan tree.
+    verifyOverlappingCacheKeys(
+        String.format(basicJoinTmpl, "probe.id = build.id"),
+        "select straight_join a.id from functional.alltypes a, 
functional.alltypes b, " +
+        "functional.alltypestiny c where a.id = b.id and b.id = c.id");
+    // Different build-side source table
+    verifyDifferentCacheKeys(
+        String.format(basicJoinTmpl, "probe.id = build.id"),
+        "select straight_join a.id from functional.alltypes a, 
functional.alltypes b " +
+        "where a.id = b.id");
+    // Different build-side predicates
+    verifyDifferentCacheKeys(
+        String.format(basicJoinTmpl, "probe.id = build.id"),
+        String.format(basicJoinTmpl, "probe.id = build.id and build.id < 
100"));
+  }
+
   /**
    * Test cases that rely on masking out unnecessary data to have cache hits.
    */
@@ -87,6 +128,10 @@ public class TupleCacheTest extends PlannerTestBase {
     verifyCacheIneligible("select id from functional_kudu.alltypes");
     verifyCacheIneligible("select id from functional_hbase.alltypes");
 
+    // Runtime filter produced by Kudu table is not implemented
+    verifyCacheIneligible("select a.id from functional.alltypes a, " +
+        "functional_kudu.alltypes b where a.id = b.id");
+
     // TODO: Random functions should make a location ineligible
     // rand()/random()/uuid()
     // verifyCacheIneligible(
@@ -149,6 +194,17 @@ public class TupleCacheTest extends PlannerTestBase {
         "where probe1.id = probe2.id and probe2.id = build.id",
         "select straight_join probe1.id from functional.alltypes probe1, " +
         "functional.alltypes build where probe1.id = build.id");
+
+    // This query has a tuple cache location for the build-side scan that 
feeds into the
+    // join on the build side. If we add a bunch of additional filters on the 
probe side
+    // table, that changes the slot ids, but we should still get identical 
keys for the
+    // build-side caching location.
+    verifyOverlappingCacheKeys(
+        "select straight_join probe.id from functional.alltypes probe, (select 
" +
+        "build1.id from functional.alltypes build1) build where probe.id = 
build.id",
+        "select straight_join probe.id from functional.alltypes probe, (select 
" +
+        "build1.id from functional.alltypes build1) build where probe.id = 
build.id " +
+        "and probe.bool_col = true and probe.int_col = 100");
   }
 
   @Test
@@ -282,6 +338,14 @@ public class TupleCacheTest extends PlannerTestBase {
       printQueryCacheEligibleNodes(query2, cacheEligibleNodes2, errorLog);
       fail(errorLog.toString());
     }
+
+    if (cacheKeys1.equals(cacheKeys2) || 
cacheHashTraces1.equals(cacheHashTraces2)) {
+      StringBuilder errorLog = new StringBuilder();
+      errorLog.append("Expected some cache keys to differ. Instead found:\n");
+      printQueryCacheEligibleNodes(query1, cacheEligibleNodes1, errorLog);
+      printQueryCacheEligibleNodes(query2, cacheEligibleNodes2, errorLog);
+      fail(errorLog.toString());
+    }
   }
 
   protected void verifyDifferentCacheKeys(String query1, String query2) {
diff --git a/tests/custom_cluster/test_tuple_cache.py 
b/tests/custom_cluster/test_tuple_cache.py
index af30201f7..6a40331ba 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -19,6 +19,7 @@ from __future__ import absolute_import, division, 
print_function
 
 import pytest
 import random
+import re
 import string
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
@@ -27,6 +28,11 @@ from tests.common.test_dimensions import (
 
 TABLE_LAYOUT = 'name STRING, age INT, address STRING'
 CACHE_START_ARGS = "--tuple_cache_dir=/tmp --log_level=2"
+NUM_HITS = 'NumTupleCacheHits'
+NUM_HALTED = 'NumTupleCacheHalted'
+NUM_SKIPPED = 'NumTupleCacheSkipped'
+# Indenation used for TUPLE_CACHE_NODE in specific fragments (not averaged 
fragment).
+NODE_INDENT = '           - '
 
 
 # Generates a random table entry of at least 15 bytes.
@@ -39,19 +45,40 @@ def table_value(seed):
   return '"{0}", {1}, "{2}"'.format(name, age, address)
 
 
-def assertCounters(profile, num_hits, num_halted, num_skipped):
-  assert "NumTupleCacheHits: {0} ".format(num_hits) in profile
-  assert "NumTupleCacheHalted: {0} ".format(num_halted) in profile
-  assert "NumTupleCacheSkipped: {0} ".format(num_skipped) in profile
+def assertCounter(profile, key, val, num_matches):
+  if not isinstance(num_matches, list):
+    num_matches = [num_matches]
+  assert profile.count("{0}{1}: {2} ".format(NODE_INDENT, key, val)) in 
num_matches, \
+      re.findall(r"{0}{1}: .*".format(NODE_INDENT, key), profile)
+
+
+def assertCounters(profile, num_hits, num_halted, num_skipped, num_matches=1):
+  assertCounter(profile, NUM_HITS, num_hits, num_matches)
+  assertCounter(profile, NUM_HALTED, num_halted, num_matches)
+  assertCounter(profile, NUM_SKIPPED, num_skipped, num_matches)
 
 
 def get_cache_keys(profile):
-  cache_keys = []
+  cache_keys = {}
+  last_node_id = -1
+  matcher = re.compile(r'TUPLE_CACHE_NODE \(id=([0-9]*)\)')
   for line in profile.splitlines():
     if "Combined Key:" in line:
       key = line.split(":")[1].strip()
-      cache_keys.append(key)
-  return cache_keys
+      cache_keys[last_node_id].append(key)
+      continue
+
+    match = matcher.search(line)
+    if match:
+      last_node_id = int(match.group(1))
+      if last_node_id not in cache_keys:
+        cache_keys[last_node_id] = []
+
+  # Sort cache keys: with multiple nodes, order in the profile may change.
+  for _, val in cache_keys.items():
+    val.sort()
+
+  return next(iter(cache_keys.values())) if len(cache_keys) == 1 else 
cache_keys
 
 
 def assert_deterministic_scan(vector, profile):
@@ -205,6 +232,142 @@ class TestTupleCache(TestTupleCacheBase):
 
     assert hit_error
 
+  @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
+  @pytest.mark.execute_serially
+  def test_runtime_filters(self, vector, unique_database):
+    """
+    This tests that adding files to a table results in different runtime 
filter keys.
+    """
+    self.client.set_configuration(vector.get_value('exec_option'))
+    fq_table = "{0}.runtime_filters".format(unique_database)
+    # A query containing multiple runtime filters
+    # - scan of A receives runtime filters from B and C, so it depends on 
contents of B/C
+    # - scan of B receives runtime filter from C, so it depends on contents of 
C
+    query = "select straight_join a.id from functional.alltypes a, 
functional.alltypes" \
+        " b, {0} c where a.id = b.id and a.id = c.age order by 
a.id".format(fq_table)
+    query_a_id = 10
+    query_b_id = 11
+    query_c_id = 12
+
+    # Create an empty table
+    self.create_table(fq_table, scale=0)
+
+    # Establish a baseline
+    empty_result = self.execute_query(query)
+    empty_cache_keys = get_cache_keys(empty_result.runtime_profile)
+    # Tables a and b have multiple files, so they are distributed across all 3 
nodes.
+    # Table c has one file, so it has a single entry.
+    assert len(empty_cache_keys) == 3
+    assert len(empty_cache_keys[query_c_id]) == 1
+    empty_c_compile_key, empty_c_finst_key = 
empty_cache_keys[query_c_id][0].split("_")
+    assert empty_c_finst_key == "0"
+    assert len(empty_result.data) == 0
+
+    # Insert a row, which creates a file / scan range
+    self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table, 
table_value(0)))
+
+    # Now, there is a scan range, so the fragment instance key should be 
non-zero.
+    one_file_result = self.execute_query(query)
+    one_cache_keys = get_cache_keys(one_file_result.runtime_profile)
+    assert len(one_cache_keys) == 3
+    assert len(empty_cache_keys[query_c_id]) == 1
+    one_c_compile_key, one_c_finst_key = 
one_cache_keys[query_c_id][0].split("_")
+    assert one_c_finst_key != "0"
+    # This should be a cache miss
+    assertCounters(one_file_result.runtime_profile, 0, 0, 0, 7)
+    assert len(one_file_result.data) == 1
+
+    # The new scan range did not change the compile-time key, but did change 
the runtime
+    # filter keys.
+    for id in [query_a_id, query_b_id]:
+      assert len(empty_cache_keys[id]) == len(one_cache_keys[id])
+      for empty, one in zip(empty_cache_keys[id], one_cache_keys[id]):
+        assert empty != one
+    assert empty_c_compile_key == one_c_compile_key
+
+    # Insert another row, which creates a file / scan range
+    self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table, 
table_value(1)))
+
+    # There is a second scan range, so the fragment instance key should change 
again
+    two_files_result = self.execute_query(query)
+    two_cache_keys = get_cache_keys(two_files_result.runtime_profile)
+    assert len(two_cache_keys) == 3
+    assert len(two_cache_keys[query_c_id]) == 2
+    two_c1_compile_key, two_c1_finst_key = 
two_cache_keys[query_c_id][0].split("_")
+    two_c2_compile_key, two_c2_finst_key = 
two_cache_keys[query_c_id][1].split("_")
+    assert two_c1_finst_key != "0"
+    assert two_c2_finst_key != "0"
+    # There may be a cache hit for the prior "c" scan range (if scheduled to 
the same
+    # instance), and the rest cache misses.
+    assertCounter(two_files_result.runtime_profile, NUM_HITS, 0, 
num_matches=[7, 8])
+    assertCounter(two_files_result.runtime_profile, NUM_HITS, 1, 
num_matches=[0, 1])
+    assertCounter(two_files_result.runtime_profile, NUM_HALTED, 0, 
num_matches=8)
+    assertCounter(two_files_result.runtime_profile, NUM_SKIPPED, 0, 
num_matches=8)
+    assert len(two_files_result.data) == 2
+    # Ordering can vary by environment. Ensure one matches and one differs.
+    assert one_c_finst_key == two_c1_finst_key or one_c_finst_key == 
two_c2_finst_key
+    assert one_c_finst_key != two_c1_finst_key or one_c_finst_key != 
two_c2_finst_key
+    overlapping_rows = 
set(one_file_result.data).intersection(set(two_files_result.data))
+    assert len(overlapping_rows) == 1
+
+    # The new scan range did not change the compile-time key, but did change 
the runtime
+    # filter keys.
+    for id in [query_a_id, query_b_id]:
+      assert len(empty_cache_keys[id]) == len(one_cache_keys[id])
+      for empty, one in zip(empty_cache_keys[id], one_cache_keys[id]):
+        assert empty != one
+    assert one_c_compile_key == two_c1_compile_key
+    assert one_c_compile_key == two_c2_compile_key
+
+    # Invalidate metadata and rerun the last query. The keys should stay the 
same.
+    self.execute_query("invalidate metadata")
+    rerun_two_files_result = self.execute_query(query)
+    # Verify that this is a cache hit
+    assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0, 
num_matches=8)
+    rerun_cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile)
+    assert rerun_cache_keys == two_cache_keys
+    assert rerun_two_files_result.data == two_files_result.data
+
+  @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
+  @pytest.mark.execute_serially
+  def test_runtime_filter_reload(self, vector, unique_database):
+    """
+    This tests that reloading files to a table results in matching runtime 
filter keys.
+    """
+    self.client.set_configuration(vector.get_value('exec_option'))
+    fq_table = "{0}.runtime_filter_genspec".format(unique_database)
+    # Query where fq_table generates a runtime filter.
+    query = "select straight_join a.id from functional.alltypes a, {0} b " \
+        "where a.id = b.age order by a.id".format(fq_table)
+
+    # Create a partitioned table with 3 partitions
+    self.execute_query("CREATE EXTERNAL TABLE {0} (name STRING) "
+                       "PARTITIONED BY (age INT)".format(fq_table))
+    self.execute_query(
+        "INSERT INTO {0} PARTITION(age=4) VALUES 
(\"Vanessa\")".format(fq_table))
+    self.execute_query(
+        "INSERT INTO {0} PARTITION(age=5) VALUES (\"Carl\")".format(fq_table))
+    self.execute_query(
+        "INSERT INTO {0} PARTITION(age=6) VALUES 
(\"Cleopatra\")".format(fq_table))
+
+    # Prime the cache
+    base_result = self.execute_query(query)
+    base_cache_keys = get_cache_keys(base_result.runtime_profile)
+    assert len(base_cache_keys) == 2
+
+    # Drop and reload the table
+    self.execute_query("DROP TABLE {0}".format(fq_table))
+    self.execute_query("CREATE EXTERNAL TABLE {0} (name STRING, address 
STRING) "
+                       "PARTITIONED BY (age INT)".format(fq_table))
+    self.execute_query("ALTER TABLE {0} RECOVER PARTITIONS".format(fq_table))
+
+    # Verify we reuse the cache
+    reload_result = self.execute_query(query)
+    reload_cache_keys = get_cache_keys(reload_result.runtime_profile)
+    assert base_result.data == reload_result.data
+    assert base_cache_keys == reload_cache_keys
+    # Skips verifying cache hits as fragments may not be assigned to the same 
nodes.
+
 
 class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase):
   """Simpler tests that run on a single node with mt_dop=0 or mt_dop=1."""

Reply via email to