This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 825900fa6 IMPALA-13057: Incorporate tuple/slot information into tuple
cache key
825900fa6 is described below
commit 825900fa6c3a51941b7b90edb8af6f7dba5e5fe8
Author: Joe McDonnell <[email protected]>
AuthorDate: Sat May 4 16:37:25 2024 -0700
IMPALA-13057: Incorporate tuple/slot information into tuple cache key
The tuple cache keys currently do not include information about
the tuples or slots, as that information is stored outside
the PlanNode thrift structures. The tuple/slot information is
critical to determining which columns are referenced and what
data layout the result tuple has. This adds code to incorporate
the TupleDescriptors and SlotDescriptors into the cache key.
Since the tuple and slot ids are indexes into a global structure
(the descriptor table), they hinder cache key matches across
different queries. If a query has an extra filter, it can shift
all the slot ids. If the query has an extra join, it can
shift all the tuple ids. To eliminate this effect, this adds the
ability to translate tuple and slot ids from global indices to
local indices. The translation only contains information from the
subtree below that point, so it is not influenced by unrelated
parts of the query.
When the code registers a tuple with the TupleCacheInfo, it also
registers a translation from the global index to a local index.
Any code that puts SlotIds or TupleIds into a Thrift data structure
can use the translateTupleId() and translateSlotId() functions to
get the local index. These are exposed on ThriftSerializationCtx
by functions of the same name, but those functions apply the
translation only when working for the tuple cache.
This passes the ThriftSerializationCtx into Exprs that have
TupleIds or SlotIds and applies the translation. It also passes
the ThriftSerializationCtx into PlanNode::toThrift(), which is
used to translate TupleIds in HdfsScanNode.
This also adds a way to register a table with the tuple cache
and incorporate information about it. This allows us to mask
out additional fields in PlanNode and enable a test case that
relies on matching with different table aliases.
Testing:
- This fixes some commented out test cases in TupleCacheTest
(specifically telling columns apart)
- This adds new test cases that match due to id translation
(extra filters, extra joins)
- This adds a unit test for the id translation to
TupleCacheInfoTest
Change-Id: I7f5278e9dbb976cbebdc6a21a6e66bc90ce06c6c
Reviewed-on: http://gerrit.cloudera.org:8080/21398
Reviewed-by: Joe McDonnell <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../java/org/apache/impala/analysis/CastExpr.java | 7 +-
.../apache/impala/analysis/DescriptorTable.java | 6 +-
.../main/java/org/apache/impala/analysis/Expr.java | 36 +++--
.../org/apache/impala/analysis/SlotDescriptor.java | 8 +-
.../java/org/apache/impala/analysis/SlotRef.java | 8 +-
.../apache/impala/analysis/TupleDescriptor.java | 6 +-
.../impala/analysis/TupleIsNullPredicate.java | 8 +-
.../impala/common/ThriftSerializationCtx.java | 72 +++++++++-
.../org/apache/impala/planner/HdfsScanNode.java | 44 ++++--
.../org/apache/impala/planner/IcebergScanNode.java | 5 +-
.../java/org/apache/impala/planner/PlanNode.java | 46 ++++--
.../org/apache/impala/planner/TupleCacheInfo.java | 156 ++++++++++++++++++++-
.../apache/impala/planner/TupleCachePlanner.java | 2 +-
.../apache/impala/planner/TupleCacheInfoTest.java | 94 +++++++++++--
.../org/apache/impala/planner/TupleCacheTest.java | 77 +++++++++-
.../queries/QueryTest/explain-level1.test | 2 +-
.../queries/QueryTest/explain-level2.test | 2 +-
17 files changed, 504 insertions(+), 75 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
b/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
index 7de8e074a..5b5264cf0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
@@ -27,6 +27,7 @@ import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.TypeCompatibility;
import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.thrift.TCastExpr;
import org.apache.impala.thrift.TExpr;
import org.apache.impala.thrift.TExprNode;
@@ -262,12 +263,12 @@ public class CastExpr extends Expr {
}
@Override
- protected void treeToThriftHelper(TExpr container) {
+ protected void treeToThriftHelper(TExpr container, ThriftSerializationCtx
serialCtx) {
if (noOp_) {
- getChild(0).treeToThriftHelper(container);
+ getChild(0).treeToThriftHelper(container, serialCtx);
return;
}
- super.treeToThriftHelper(container);
+ super.treeToThriftHelper(container, serialCtx);
}
@Override
diff --git a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
index 4127fa794..2a757419a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
@@ -34,6 +34,7 @@ import org.apache.impala.catalog.Type;
import org.apache.impala.common.IdGenerator;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.JniUtil;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.thrift.TColumnType;
import org.apache.impala.thrift.TDescriptorTable;
import org.apache.impala.thrift.TDescriptorTableSerialized;
@@ -183,6 +184,7 @@ public class DescriptorTable {
*/
public TDescriptorTable toThrift() {
TDescriptorTable result = new TDescriptorTable();
+ ThriftSerializationCtx serialCtx = new ThriftSerializationCtx();
// Maps from base table to its table id used in the backend.
Map<FeTable, Integer> tableIdMap = new HashMap<>();
// Used to check table level consistency
@@ -220,10 +222,10 @@ public class DescriptorTable {
// currently are several situations in which we send materialized tuples
without
// a mem layout to the BE, e.g., when unnesting unions or when replacing
plan
// trees with an EmptySetNode.
- result.addToTupleDescriptors(tupleDesc.toThrift(tableId));
+ result.addToTupleDescriptors(tupleDesc.toThrift(tableId, serialCtx));
// Only serialize materialized slots
for (SlotDescriptor slotD: tupleDesc.getMaterializedSlots()) {
- result.addToSlotDescriptors(slotD.toThrift());
+ result.addToSlotDescriptors(slotD.toThrift(serialCtx));
}
}
for (FeTable tbl: tableIdMap.keySet()) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java
b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 4a07e67a6..7c51f041f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -39,6 +39,7 @@ import org.apache.impala.catalog.TypeCompatibility;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.SqlCastException;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.common.TreeNode;
import org.apache.impala.rewrite.ExprRewriter;
import org.apache.impala.service.FeSupport;
@@ -861,23 +862,29 @@ abstract public class Expr extends TreeNode<Expr>
implements ParseNode, Cloneabl
protected String toSqlImpl() { return toSqlImpl(DEFAULT); };
- // Convert this expr, including all children, to its Thrift representation.
+ // For locations that don't need to handle tuple caching yet, keep the
+ // old signature with a default ThriftSerializationCtx.
public TExpr treeToThrift() {
+ return treeToThrift(new ThriftSerializationCtx());
+ }
+
+ // Convert this expr, including all children, to its Thrift representation.
+ public TExpr treeToThrift(ThriftSerializationCtx serialCtx) {
if (type_.isNull()) {
// Hack to ensure BE never sees TYPE_NULL. If an expr makes it this far
without
// being cast to a non-NULL type, the type doesn't matter and we can
cast it
// arbitrarily.
Preconditions.checkState(IS_NULL_LITERAL.apply(this) ||
this instanceof SlotRef);
- return NullLiteral.create(ScalarType.BOOLEAN).treeToThrift();
+ return NullLiteral.create(ScalarType.BOOLEAN).treeToThrift(serialCtx);
}
TExpr result = new TExpr();
- treeToThriftHelper(result);
+ treeToThriftHelper(result, serialCtx);
return result;
}
// Append a flattened version of this expr, including all children, to
'container'.
- protected void treeToThriftHelper(TExpr container) {
+ protected void treeToThriftHelper(TExpr container, ThriftSerializationCtx
serialCtx) {
Preconditions.checkState(isAnalyzed_,
"Must be analyzed before serializing to thrift. %s", this);
Preconditions.checkState(!type_.isWildcardDecimal());
@@ -894,10 +901,10 @@ abstract public class Expr extends TreeNode<Expr>
implements ParseNode, Cloneabl
msg.setFn(thriftFn);
if (fn_.hasVarArgs()) msg.setVararg_start_idx(fn_.getNumArgs() - 1);
}
- toThrift(msg);
+ toThrift(msg, serialCtx);
container.addToNodes(msg);
for (Expr child: children_) {
- child.treeToThriftHelper(container);
+ child.treeToThriftHelper(container, serialCtx);
}
}
@@ -905,6 +912,14 @@ abstract public class Expr extends TreeNode<Expr>
implements ParseNode, Cloneabl
// msg.op as well as the expr-specific field.
protected abstract void toThrift(TExprNode msg);
+ // Exprs should override this signature of toThrift() if they need access to
+ // the ThriftSerializationContext. That is necessary if the Expr is
non-deterministic
+ // or uses SlotIds/TupleIds. Everything else can simply keep using the old
signature
+ // for toThrift().
+ protected void toThrift(TExprNode msg, ThriftSerializationCtx serialCtx) {
+ toThrift(msg);
+ }
+
/**
* Returns the product of the given exprs' number of distinct values or -1
if any of
* the exprs have an invalid number of distinct values. Uses saturating
arithmetic,
@@ -924,14 +939,19 @@ abstract public class Expr extends TreeNode<Expr>
implements ParseNode, Cloneabl
return numDistinctValues;
}
- public static List<TExpr> treesToThrift(List<? extends Expr> exprs) {
+ public static List<TExpr> treesToThrift(List<? extends Expr> exprs,
+ ThriftSerializationCtx serialCtx) {
List<TExpr> result = new ArrayList<>();
for (Expr expr: exprs) {
- result.add(expr.treeToThrift());
+ result.add(expr.treeToThrift(serialCtx));
}
return result;
}
+ public static List<TExpr> treesToThrift(List<? extends Expr> exprs) {
+ return treesToThrift(exprs, new ThriftSerializationCtx());
+ }
+
public boolean isAggregate() {
return IS_AGGREGATE.apply(this);
}
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index 939249c01..7f2496636 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -30,6 +30,7 @@ import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.IcebergColumn;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.Type;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.thrift.TSlotDescriptor;
import org.apache.impala.thrift.TVirtualColumnType;
import org.slf4j.Logger;
@@ -445,11 +446,12 @@ public class SlotDescriptor {
return true;
}
- public TSlotDescriptor toThrift() {
+ public TSlotDescriptor toThrift(ThriftSerializationCtx serialCtx) {
Preconditions.checkState(isMaterialized_);
List<Integer> materializedPath = getMaterializedPath();
TSlotDescriptor result = new TSlotDescriptor(
- id_.asInt(), parent_.getId().asInt(), type_.toThrift(),
+ serialCtx.translateSlotId(id_).asInt(),
+ serialCtx.translateTupleId(parent_.getId()).asInt(), type_.toThrift(),
materializedPath, byteOffset_, nullIndicatorByte_, nullIndicatorBit_,
slotIdx_, getVirtualColumnType());
if (itemTupleDesc_ != null) {
@@ -460,7 +462,7 @@ public class SlotDescriptor {
// have such a guarantee.
Preconditions.checkState(!isScanSlot() ||
itemTupleDesc_.getId().asInt() > parent_.getId().asInt());
- result.setItemTupleId(itemTupleDesc_.getId().asInt());
+
result.setItemTupleId(serialCtx.translateTupleId(itemTupleDesc_.getId()).asInt());
}
return result;
}
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index 3208b5451..2d504ec83 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -31,6 +31,7 @@ import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.TypeCompatibility;
import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.common.UnsupportedFeatureException;
import org.apache.impala.thrift.TExprNode;
import org.apache.impala.thrift.TExprNodeType;
@@ -320,8 +321,13 @@ public class SlotRef extends Expr {
@Override
protected void toThrift(TExprNode msg) {
+ Preconditions.checkState(false, "Unexpected use of old toThrift()
signature");
+ }
+
+ @Override
+ protected void toThrift(TExprNode msg, ThriftSerializationCtx serialCtx) {
msg.node_type = TExprNodeType.SLOT_REF;
- msg.slot_ref = new TSlotRef(desc_.getId().asInt());
+ msg.slot_ref = new
TSlotRef(serialCtx.translateSlotId(desc_.getId()).asInt());
// we shouldn't be sending exprs over non-materialized slots
Preconditions.checkState(desc_.isMaterialized(), String.format(
"Illegal reference to non-materialized slot: tid=%s sid=%s",
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index 43f625411..5d81d40c3 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -32,6 +32,7 @@ import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.Pair;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.thrift.TTupleDescriptor;
import com.google.common.base.Joiner;
@@ -306,9 +307,10 @@ public class TupleDescriptor {
for (SlotDescriptor slot: getSlotsRecursively())
slot.setIsMaterialized(true);
}
- public TTupleDescriptor toThrift(Integer tableId) {
+ public TTupleDescriptor toThrift(Integer tableId, ThriftSerializationCtx
serialCtx) {
TTupleDescriptor ttupleDesc =
- new TTupleDescriptor(id_.asInt(), byteSize_, numNullBytes_);
+ new TTupleDescriptor(serialCtx.translateTupleId(id_).asInt(),
byteSize_,
+ numNullBytes_);
if (tableId == null) return ttupleDesc;
ttupleDesc.setTableId(tableId);
Preconditions.checkNotNull(path_);
diff --git
a/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
b/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
index 92685b8ba..079aa23c2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.thrift.TExprNode;
import org.apache.impala.thrift.TExprNodeType;
import org.apache.impala.thrift.TTupleIsNullPredicate;
@@ -77,6 +78,11 @@ public class TupleIsNullPredicate extends Predicate {
@Override
protected void toThrift(TExprNode msg) {
+ Preconditions.checkState(false, "Unexpected use of old toThrift()
signature");
+ }
+
+ @Override
+ protected void toThrift(TExprNode msg, ThriftSerializationCtx serialCtx) {
msg.node_type = TExprNodeType.TUPLE_IS_NULL_PRED;
msg.tuple_is_null_pred = new TTupleIsNullPredicate();
Preconditions.checkNotNull(analyzer_);
@@ -86,7 +92,7 @@ public class TupleIsNullPredicate extends Predicate {
Preconditions.checkNotNull(tupleDesc, "Unknown tuple id: " +
tid.toString());
Preconditions.checkState(tupleDesc.isMaterialized(),
String.format("Illegal reference to non-materialized tuple: tid=%s",
tid));
- msg.tuple_is_null_pred.addToTuple_ids(tid.asInt());
+
msg.tuple_is_null_pred.addToTuple_ids(serialCtx.translateTupleId(tid).asInt());
}
}
diff --git
a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
index a0d245cb9..af4a508a1 100644
--- a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
+++ b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
@@ -17,6 +17,9 @@
package org.apache.impala.common;
+import org.apache.impala.analysis.SlotId;
+import org.apache.impala.analysis.TupleId;
+import org.apache.impala.catalog.FeTable;
import org.apache.impala.planner.TupleCacheInfo;
/**
@@ -24,9 +27,22 @@ import org.apache.impala.planner.TupleCacheInfo;
* Thrift serialization is happening for tuple caching or not. This context is
* passed to Thrift serialization functions like initThrift() or toThrift().
* The context currently provides a way to determine whether serialization is
happening
- * for tuple caching. It will be expanded to provide other methods that the
Thrift
- * serialization functions can call to translate the Thrift output or register
- * non-deterministic operations.
+ * for tuple caching.
+ *
+ * It also provides several functions that adjust their behavior based on
whether
+ * this is for tuple caching or not (e.g. translateTupleId()). This allows
+ * caller to run the function unconditionally without needing extra if/else
checks
+ * for tuple caching. e.g.
+ *
+ * struct.id = serialCtx.translateTupleId(id_).asInt();
+ *
+ * rather than:
+ *
+ * if (serialCtx.isTupleCache()) {
+ * struct.id = serialCtx.translateTupleId(id_).asInt();
+ * } else {
+ * struct.id = id_.asInt();
+ * }
*/
public class ThriftSerializationCtx {
private TupleCacheInfo tupleCacheInfo_;
@@ -52,4 +68,54 @@ public class ThriftSerializationCtx {
public boolean isTupleCache() {
return tupleCacheInfo_ != null;
}
+
+ /**
+ * registerTuple() should be called for each TupleId that is referenced from
a PlanNode.
+ * This only has an effect for tuple caching.
+ */
+ public void registerTuple(TupleId id) {
+ if (isTupleCache()) {
+ tupleCacheInfo_.registerTuple(id);
+ }
+ }
+
+ /**
+ * registerTable() should be called for any table that is referenced from a
PlanNode
+ * that participates in tuple caching. In practice, this is only used for
HDFS tables
+ * at the moment.
+ */
+ public void registerTable(FeTable table) {
+ if (isTupleCache()) {
+ tupleCacheInfo_.registerTable(table);
+ }
+ }
+
+ /**
+ * translateTupleId() is designed to be applied to every TupleId
incorporated into a
+ * Thrift structure. For tuple caching, translateTupleId() translates the
passed in
+ * global TupleId into a local TupleId. All other use cases use the global
id, so this
+ * simply returns the globalId passed in. Any TupleId passed into
translateTupleId()
+ * must already be registered via registerTuple().
+ */
+ public TupleId translateTupleId(TupleId globalId) {
+ if (isTupleCache()) {
+ return tupleCacheInfo_.getLocalTupleId(globalId);
+ } else {
+ return globalId;
+ }
+ }
+
+ /**
+ * translateSlotId() is designed to be applied to every SlotId incorporated
into a
+ * Thrift structure. For tuple caching, translateSlotId() translates the
passed in
+ * global SlotId into a local SlotId. All other use cases use the global id,
so this
+ * simply returns the globalId passed in.
+ */
+ public SlotId translateSlotId(SlotId globalId) {
+ if (isTupleCache()) {
+ return tupleCacheInfo_.getLocalSlotId(globalId);
+ } else {
+ return globalId;
+ }
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index e79ada1d1..23287e176 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -70,6 +70,7 @@ import org.apache.impala.common.NotImplementedException;
import org.apache.impala.common.Pair;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.fb.FbFileBlock;
import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
import org.apache.impala.service.BackendConfig;
@@ -1922,7 +1923,15 @@ public class HdfsScanNode extends ScanNode {
@Override
protected void toThrift(TPlanNode msg) {
- msg.hdfs_scan_node = new THdfsScanNode(desc_.getId().asInt(), new
HashSet<>());
+ Preconditions.checkState(false, "Unexpected use of old toThrift()
signature.");
+ }
+
+ @Override
+ protected void toThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) {
+ msg.hdfs_scan_node = new THdfsScanNode(serialCtx.translateTupleId(
+ desc_.getId()).asInt(), new HashSet<>());
+ // Register the table for this scan node so tuple caching knows about it.
+ serialCtx.registerTable(desc_.getTable());
if (replicaPreference_ != null) {
msg.hdfs_scan_node.setReplica_preference(replicaPreference_);
}
@@ -1932,8 +1941,9 @@ public class HdfsScanNode extends ScanNode {
Map<Integer, List<TExpr>> tcollectionConjuncts = new LinkedHashMap<>();
for (Map.Entry<TupleDescriptor, List<Expr>> entry:
collectionConjuncts_.entrySet()) {
- tcollectionConjuncts.put(entry.getKey().getId().asInt(),
- Expr.treesToThrift(entry.getValue()));
+ tcollectionConjuncts.put(
+ serialCtx.translateTupleId(entry.getKey().getId()).asInt(),
+ Expr.treesToThrift(entry.getValue(), serialCtx));
}
msg.hdfs_scan_node.setCollection_conjuncts(tcollectionConjuncts);
}
@@ -1945,22 +1955,26 @@ public class HdfsScanNode extends ScanNode {
if (countStarSlot_ != null) {
msg.hdfs_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset());
}
- if (!statsConjuncts_.isEmpty()) {
- for (Expr e: statsConjuncts_) {
- msg.hdfs_scan_node.addToStats_conjuncts(e.treeToThrift());
+ // Stats/dictionary filter conjuncts do not change the logical set that
would be
+ // returned, so they can be skipped for tuple caching.
+ if (!serialCtx.isTupleCache()) {
+ if (!statsConjuncts_.isEmpty()) {
+ for (Expr e: statsConjuncts_) {
+ msg.hdfs_scan_node.addToStats_conjuncts(e.treeToThrift());
+ }
}
- }
- if (statsTuple_ != null) {
- msg.hdfs_scan_node.setStats_tuple_id(statsTuple_.getId().asInt());
- }
+ if (statsTuple_ != null) {
+ msg.hdfs_scan_node.setStats_tuple_id(statsTuple_.getId().asInt());
+ }
- Map<Integer, List<Integer>> dictMap = new LinkedHashMap<>();
- for (Map.Entry<SlotDescriptor, List<Integer>> entry :
- dictionaryFilterConjuncts_.entrySet()) {
- dictMap.put(entry.getKey().getId().asInt(), entry.getValue());
+ Map<Integer, List<Integer>> dictMap = new LinkedHashMap<>();
+ for (Map.Entry<SlotDescriptor, List<Integer>> entry :
+ dictionaryFilterConjuncts_.entrySet()) {
+ dictMap.put(entry.getKey().getId().asInt(), entry.getValue());
+ }
+ msg.hdfs_scan_node.setDictionary_filter_conjuncts(dictMap);
}
- msg.hdfs_scan_node.setDictionary_filter_conjuncts(dictMap);
msg.hdfs_scan_node.setIs_partition_key_scan(isPartitionKeyScan_);
for (HdfsFileFormat format : fileFormats_) {
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index ad9928a3a..62c65070c 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -37,6 +37,7 @@ import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.fb.FbIcebergDataFileFormat;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TPlanNode;
@@ -260,8 +261,8 @@ public class IcebergScanNode extends HdfsScanNode {
}
@Override
- protected void toThrift(TPlanNode msg) {
- super.toThrift(msg);
+ protected void toThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) {
+ super.toThrift(msg, serialCtx);
Preconditions.checkNotNull(msg.hdfs_scan_node);
if (deleteFileScanNodeId != null) {
msg.hdfs_scan_node.setDeleteFileScanNodeId(deleteFileScanNodeId.asInt());
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index dbc7b74b4..842e024d8 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -32,6 +32,7 @@ import java.util.Stack;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.CompoundPredicate;
+import org.apache.impala.analysis.DescriptorTable;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprId;
import org.apache.impala.analysis.ExprSubstitutionMap;
@@ -510,26 +511,33 @@ abstract public class PlanNode extends TreeNode<PlanNode>
{
* This is called before calling toThrift().
*/
private void initThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) {
- msg.node_id = id_.asInt();
msg.limit = limit_;
- TExecStats estimatedStats = new TExecStats();
- estimatedStats.setCardinality(
- filteredCardinality_ > -1 ? filteredCardinality_ : cardinality_);
- estimatedStats.setMemory_used(nodeResourceProfile_.getMemEstimateBytes());
- msg.setLabel(getDisplayLabel());
- msg.setLabel_detail(getDisplayLabelDetail());
- msg.setEstimated_stats(estimatedStats);
+ if (!serialCtx.isTupleCache()) {
+ msg.node_id = id_.asInt();
+ TExecStats estimatedStats = new TExecStats();
+ estimatedStats.setCardinality(
+ filteredCardinality_ > -1 ? filteredCardinality_ : cardinality_);
+
estimatedStats.setMemory_used(nodeResourceProfile_.getMemEstimateBytes());
+ msg.setLabel(getDisplayLabel());
+ msg.setLabel_detail(getDisplayLabelDetail());
+ msg.setEstimated_stats(estimatedStats);
+ } else {
+ // Do not set node_id for tuple caching, as it is a global index that is
impacted
+ // by the shape of the rest of the query.
+ msg.node_id = 0;
+ }
Preconditions.checkState(tupleIds_.size() > 0);
msg.setRow_tuples(Lists.<Integer>newArrayListWithCapacity(tupleIds_.size()));
msg.setNullable_tuples(Lists.<Boolean>newArrayListWithCapacity(tupleIds_.size()));
for (TupleId tid: tupleIds_) {
- msg.addToRow_tuples(tid.asInt());
+ serialCtx.registerTuple(tid);
+ msg.addToRow_tuples(serialCtx.translateTupleId(tid).asInt());
msg.addToNullable_tuples(nullableTupleIds_.contains(tid));
}
for (Expr e: conjuncts_) {
- msg.addToConjuncts(e.treeToThrift());
+ msg.addToConjuncts(e.treeToThrift(serialCtx));
}
// Serialize any runtime filters
for (RuntimeFilter filter : runtimeFilters_) {
@@ -559,7 +567,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
TPlanNode msg = new TPlanNode();
ThriftSerializationCtx serialCtx = new ThriftSerializationCtx();
initThrift(msg, serialCtx);
- toThrift(msg);
+ toThrift(msg, serialCtx);
container.addToNodes(msg);
// For the purpose of the BE consider cross-fragment children (i.e.
// ExchangeNodes and separated join builds) to have no children.
@@ -852,6 +860,14 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
return cardinality;
}
+ protected void toThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) {
+ // This should only be used for PlanNodes that don't support tuple
+ // caching. Any PlanNode that supports tuple caching must override
+ // this method with its own implementation.
+ Preconditions.checkState(!isTupleCachingImplemented());
+ toThrift(msg);
+ }
+
// Convert this plan node into msg (excluding children), which requires
setting
// the node type and the node-specific field.
protected abstract void toThrift(TPlanNode msg);
@@ -1298,12 +1314,12 @@ abstract public class PlanNode extends
TreeNode<PlanNode> {
* This computes the cache key by hashing Thrift structures, but it only
computes
* the key if the node is eligible to avoid overhead.
*/
- public void computeTupleCacheInfo() {
- tupleCacheInfo_ = new TupleCacheInfo();
+ public void computeTupleCacheInfo(DescriptorTable descTbl) {
+ tupleCacheInfo_ = new TupleCacheInfo(descTbl);
// computing the tuple cache information is a bottom-up tree traversal,
// so visit and merge the children before processing this node's contents
for (int i = 0; i < getChildCount(); i++) {
- getChild(i).computeTupleCacheInfo();
+ getChild(i).computeTupleCacheInfo(descTbl);
tupleCacheInfo_.mergeChild(getChild(i).getTupleCacheInfo());
}
@@ -1323,7 +1339,7 @@ abstract public class PlanNode extends TreeNode<PlanNode>
{
TPlanNode msg = new TPlanNode();
ThriftSerializationCtx serialCtx = new
ThriftSerializationCtx(tupleCacheInfo_);
initThrift(msg, serialCtx);
- toThrift(msg);
+ toThrift(msg, serialCtx);
tupleCacheInfo_.hashThrift(msg);
tupleCacheInfo_.finalize();
}
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
index 0d3835870..b02ee322e 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
@@ -18,15 +18,30 @@
package org.apache.impala.planner;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.impala.analysis.DescriptorTable;
+import org.apache.impala.analysis.SlotDescriptor;
+import org.apache.impala.analysis.SlotId;
+import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.analysis.TupleId;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.FeView;
+import org.apache.impala.common.IdGenerator;
+import org.apache.impala.common.ThriftSerializationCtx;
+import org.apache.impala.thrift.TSlotDescriptor;
+import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TTupleDescriptor;
import org.apache.thrift.TBase;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
+import com.google.common.base.Preconditions;
import com.google.common.hash.Hasher;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
-import com.google.common.base.Preconditions;
/**
* TupleCacheInfo stores the eligibility and cache key information for a
PlanNode.
@@ -41,8 +56,29 @@ import com.google.common.base.Preconditions;
* To support this, it provides a function to incorporate any Thrift
structure's
* contents into the hash.
*
+ * One critical piece of making the Thrift structures more general is id
translation.
+ * Since TupleIds and SlotIds are global to the query, the value of any id
will be
+ * influenced by the rest of the query unless we translate it to a local id.
+ * Plan nodes register their tuples via registerTuple(). This allows the tuple
/ slot
+ * information to be incorporated into the hash (by accessing the
DescriptorTable),
+ * but it also allocates a local id and adds an entry to the translation map.
Exprs
+ * and other structures can use translateSlotId() and translateTupleId() to
adjust
+ * global ids to local ids. When TupleCacheInfos are merged, they merge the
translations
+ * so there are no conflicts. Translation always goes in the global to local
direction.
+ *
+ * There are a few reason that we don't try to maintain local ids earlier in
planning:
+ * 1. Only tuple caching needs local ids. The extra modifications introduce
risk and
+ * don't dramatically improve the outcome.
+ * 2. The plan shape can change across the various phases of planning. In
particular,
+ * runtime filters add edges to the PlanNode graph. It is hard to produce a
stable
+ * local id until the plan is stable.
+ * 3. There is ongoing work to add a Calcite planner, and we will want to
support tuple
+ * caching for that planner. Any logic in analysis/planning that produces
local ids
+ * will need to also work for Calcite analyzer/planner.
+ *
* For debuggability, this keeps a human-readable trace of what has been
incorporated
* into the cache key. This will help track down why two cache keys are
different.
+ * Anything hashed will have a representation incorporated into the trace.
*
* This accumulates information from various sources, then it is finalized and
cannot
* be modified further. The hash key and hash trace cannot be accessed until
finalize()
@@ -57,6 +93,21 @@ public class TupleCacheInfo {
}
private EnumSet<IneligibilityReason> ineligibilityReasons_;
+ // read-only reference to the query's descriptor table
+ // used for incorporating tuple/slot/table information
+ private DescriptorTable descriptorTable_;
+
+ // The tuple translation uses a tree map because we need a deterministic
order
+ // for visting elements when merging two translation maps.
+ private final Map<TupleId, TupleId> tupleTranslationMap_ = new TreeMap<>();
+ // The slot translation does not need to be a deterministic order, so it
+ // can use a HashMap.
+ private final Map<SlotId, SlotId> slotTranslationMap_ = new HashMap<>();
+ private final IdGenerator<TupleId> translatedTupleIdGenerator_ =
+ TupleId.createGenerator();
+ private final IdGenerator<SlotId> translatedSlotIdGenerator_ =
+ SlotId.createGenerator();
+
// These fields accumulate partial results until finalize() is called.
private Hasher hasher_ = Hashing.murmur3_128().newHasher();
@@ -69,8 +120,9 @@ public class TupleCacheInfo {
private String finalizedHashTrace_ = null;
private String finalizedHashString_ = null;
- public TupleCacheInfo() {
+ public TupleCacheInfo(DescriptorTable descTbl) {
ineligibilityReasons_ = EnumSet.noneOf(IneligibilityReason.class);
+ descriptorTable_ = descTbl;
}
public void setIneligible(IneligibilityReason reason) {
@@ -129,6 +181,15 @@ public class TupleCacheInfo {
// node. We could display each node's hash trace in explain plan,
// and each contribution would be clear.
hashTraceBuilder_.append(child.getHashTrace());
+
+ // Incorporate the child's tuple references. This is creating a new
translation
+ // of TupleIds, because it will be incorporating multiple children.
+ for (TupleId id : child.tupleTranslationMap_.keySet()) {
+ // Register the tuples, but don't incorporate their content into the
hash.
+ // The content was already hashed by the children, so we only need the
+ // id translation maps.
+ registerTupleHelper(id, false);
+ }
}
}
@@ -154,4 +215,95 @@ public class TupleCacheInfo {
Preconditions.checkState(thriftString != null);
hashTraceBuilder_.append(thriftString);
}
+
+ /**
+ * registerTuple() does two things:
+ * 1. It incorporates a tuple's layout (and slot information) into the cache
key.
+ * 2. It establishes a mapping from the global TupleIds/SlotIds to local
+ * TupleIds/SlotIds. See explanation above about id translation.
+ * It should be called for any tuple that is referenced from a PlanNode that
supports
+ * tuple caching. It is usually called via the ThriftSerializationCtx. If
the tuple has
+ * already been registered, this immediately returns.
+ */
+ public void registerTuple(TupleId id) {
+ registerTupleHelper(id, true);
+ }
+
+ private void registerTupleHelper(TupleId id, boolean incorporateIntoHash) {
+ Preconditions.checkState(!finalized_,
+ "TupleCacheInfo is finalized and can't be modified");
+ ThriftSerializationCtx serialCtx = new ThriftSerializationCtx(this);
+ // If we haven't seen this tuple before:
+ // - assign an index for the tuple
+ // - assign indexes for the tuple's slots
+ // - incorporate the tuple and slots into the hash / hash trace
+ if (!tupleTranslationMap_.containsKey(id)) {
+ // Assign a translated tuple id and add it to the map
+ tupleTranslationMap_.put(id, translatedTupleIdGenerator_.getNextId());
+
+ TupleDescriptor tupleDesc = descriptorTable_.getTupleDesc(id);
+ if (incorporateIntoHash) {
+ // Incorporate the tupleDescriptor into the hash
+ boolean needs_table_id =
+ (tupleDesc.getTable() != null && !(tupleDesc.getTable() instanceof
FeView));
+ TTupleDescriptor thriftTupleDesc =
+ tupleDesc.toThrift(needs_table_id ? new Integer(1) : null,
serialCtx);
+ hashThrift(thriftTupleDesc);
+ }
+
+ // Go through the tuple's slots and add them
+ for (SlotDescriptor slotDesc : tupleDesc.getSlots()) {
+ // Assign a translated slot id and it to the map
+ slotTranslationMap_.put(slotDesc.getId(),
translatedSlotIdGenerator_.getNextId());
+
+ if (incorporateIntoHash) {
+ // Incorporate the SlotDescriptor into the hash
+ TSlotDescriptor thriftSlotDesc = slotDesc.toThrift(serialCtx);
+ hashThrift(thriftSlotDesc);
+ }
+ // Slots can have nested tuples, so this can recurse. The depth is
limited.
+ TupleDescriptor nestedTupleDesc = slotDesc.getItemTupleDesc();
+ if (nestedTupleDesc != null) {
+ registerTupleHelper(nestedTupleDesc.getId(), incorporateIntoHash);
+ }
+ }
+ }
+ }
+
+ /**
+ * registerTable() incorporates a table's information into the cache key.
This is
+ * designed to be called by scan nodes via the ThriftSerializationCtx. In
future,
+ * this will store information about the table's scan ranges.
+ */
+ public void registerTable(FeTable tbl) {
+ Preconditions.checkState(!(tbl instanceof FeView),
+ "registerTable() only applies to base tables");
+ Preconditions.checkState(tbl != null, "Invalid null argument to
registerTable()");
+
+ // Right now, we only hash the database / table name.
+ TTableName tblName = tbl.getTableName().toThrift();
+ hashThrift(tblName);
+ }
+
+ /**
+ * getLocalTupleId() converts a global TupleId to a local TupleId (i.e an id
that is
+ * not influenced by the structure of the rest of the query). Most users
should access
+ * this via the ThriftSerializationCtx's translateTupleId().
+ */
+ public TupleId getLocalTupleId(TupleId globalId) {
+ // The tuple must have been registered before this reference happens
+ Preconditions.checkState(tupleTranslationMap_.containsKey(globalId));
+ return tupleTranslationMap_.get(globalId);
+ }
+
+ /**
+ * getLocalSlotId() converts a global TupleId to a local TupleId (i.e an id
that is
+ * not influenced by the structure of the rest of the query). Most users
should access
+ * this via the ThriftSerializationCtx's translateSlotId().
+ */
+ public SlotId getLocalSlotId(SlotId globalId) {
+ // The slot must have been registered before this reference happens
+ Preconditions.checkState(slotTranslationMap_.containsKey(globalId));
+ return slotTranslationMap_.get(globalId);
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
b/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
index ebc049a8f..8b4cdd101 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
@@ -52,7 +52,7 @@ public class TupleCachePlanner {
// Start at the root of the PlanNode tree
PlanNode root = plan.get(0).getPlanRoot();
// Step 1: Compute the TupleCacheInfo for all PlanNodes
- root.computeTupleCacheInfo();
+ root.computeTupleCacheInfo(ctx_.getRootAnalyzer().getDescTbl());
// Step 2: Build up the new PlanNode tree with TupleCacheNodes added
PlanNode newRoot = buildCachingPlan(root);
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheInfoTest.java
b/fe/src/test/java/org/apache/impala/planner/TupleCacheInfoTest.java
index 82e5827a9..e1a798b4e 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheInfoTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheInfoTest.java
@@ -21,6 +21,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import org.apache.impala.analysis.DescriptorTable;
+import org.apache.impala.analysis.SlotDescriptor;
+import org.apache.impala.analysis.SlotId;
+import org.apache.impala.analysis.TupleDescriptor;
+import org.apache.impala.analysis.TupleId;
+import org.apache.impala.catalog.PrimitiveType;
+import org.apache.impala.catalog.ScalarType;
import org.apache.impala.thrift.TUniqueId;
import org.junit.Test;
@@ -32,11 +39,12 @@ public class TupleCacheInfoTest {
@Test
public void testHashThrift() {
- TupleCacheInfo info1 = new TupleCacheInfo();
+ // This test doesn't need a DescriptorTable, so it just sets it to null.
+ TupleCacheInfo info1 = new TupleCacheInfo(null);
info1.hashThrift(new TUniqueId(1L, 2L));
info1.finalize();
- TupleCacheInfo info2 = new TupleCacheInfo();
+ TupleCacheInfo info2 = new TupleCacheInfo(null);
info2.hashThrift(new TUniqueId(1L, 2L));
info2.finalize();
@@ -49,15 +57,16 @@ public class TupleCacheInfoTest {
@Test
public void testMergeHash() {
- TupleCacheInfo child1 = new TupleCacheInfo();
+ // This test doesn't need a DescriptorTable, so it just sets it to null.
+ TupleCacheInfo child1 = new TupleCacheInfo(null);
child1.hashThrift(new TUniqueId(1L, 2L));
child1.finalize();
- TupleCacheInfo child2 = new TupleCacheInfo();
+ TupleCacheInfo child2 = new TupleCacheInfo(null);
child2.hashThrift(new TUniqueId(3L, 4L));
child2.finalize();
- TupleCacheInfo parent = new TupleCacheInfo();
+ TupleCacheInfo parent = new TupleCacheInfo(null);
parent.mergeChild(child1);
parent.mergeChild(child2);
parent.hashThrift(new TUniqueId(5L, 6L));
@@ -71,19 +80,20 @@ public class TupleCacheInfoTest {
@Test
public void testMergeEligibility() {
+ // This test doesn't need a DescriptorTable, so it just sets it to null.
// Child 1 is eligible
- TupleCacheInfo child1 = new TupleCacheInfo();
+ TupleCacheInfo child1 = new TupleCacheInfo(null);
child1.hashThrift(new TUniqueId(1L, 2L));
child1.finalize();
assertTrue(child1.isEligible());
// Child 2 is ineligible
- TupleCacheInfo child2 = new TupleCacheInfo();
+ TupleCacheInfo child2 = new TupleCacheInfo(null);
child2.setIneligible(TupleCacheInfo.IneligibilityReason.NOT_IMPLEMENTED);
child2.finalize();
assertTrue(!child2.isEligible());
- TupleCacheInfo parent = new TupleCacheInfo();
+ TupleCacheInfo parent = new TupleCacheInfo(null);
parent.mergeChild(child1);
// Still eligible after adding child1 without child2
assertTrue(parent.isEligible());
@@ -95,4 +105,72 @@ public class TupleCacheInfoTest {
assertTrue(!parent.isEligible());
}
+ @Test
+ public void testIdTranslation() {
+ // Create a DescriptorTable and add two tuples each with one integer slot.
+ DescriptorTable descTbl = new DescriptorTable();
+ TupleDescriptor tuple1 = descTbl.createTupleDescriptor("tuple1");
+ assertEquals(tuple1.getId().asInt(), 0);
+ SlotDescriptor t1slot = descTbl.addSlotDescriptor(tuple1);
+ t1slot.setType(ScalarType.createType(PrimitiveType.INT));
+ t1slot.setLabel("t1slot");
+ assertEquals(t1slot.getId().asInt(), 0);
+ TupleDescriptor tuple2 = descTbl.createTupleDescriptor("tuple2");
+ assertEquals(tuple2.getId().asInt(), 1);
+ SlotDescriptor t2slot = descTbl.addSlotDescriptor(tuple2);
+ t2slot.setType(ScalarType.createType(PrimitiveType.INT));
+ t2slot.setLabel("t2slot");
+ assertEquals(t2slot.getId().asInt(), 1);
+
+ tuple1.materializeSlots();
+ tuple2.materializeSlots();
+ descTbl.computeMemLayout();
+
+ TupleCacheInfo child1 = new TupleCacheInfo(descTbl);
+ child1.hashThrift(new TUniqueId(1L, 2L));
+ child1.registerTuple(tuple1.getId());
+ child1.finalize();
+ assertEquals(child1.getLocalTupleId(tuple1.getId()).asInt(), 0);
+ assertEquals(child1.getLocalSlotId(t1slot.getId()).asInt(), 0);
+ String child1ExpectedHashTrace = "TUniqueId(hi:1, lo:2)" +
+ "TTupleDescriptor(id:0, byteSize:5, numNullBytes:1)" +
+ "TSlotDescriptor(id:0, parent:0, slotType:TColumnType(types:[" +
+ "TTypeNode(type:SCALAR, scalar_type:TScalarType(type:INT))]), " +
+ "materializedPath:[], byteOffset:0, nullIndicatorByte:4,
nullIndicatorBit:0, " +
+ "slotIdx:0, virtual_col_type:NONE)";
+ assertEquals(child1.getHashTrace(), child1ExpectedHashTrace);
+
+ // To demonstrate why we're doing this, child2 uses the same TUniqueId as
+ // child1, but different tuple / slot ids.
+ TupleCacheInfo child2 = new TupleCacheInfo(descTbl);
+ child2.hashThrift(new TUniqueId(1L, 2L));
+ child2.registerTuple(tuple2.getId());
+ child2.finalize();
+ // Note: we expect the id's to be translated to local ids, so even though
this is
+ // tuple 2 and slot 2, this will still have TupleId=0 and SlotId=0. In
fact, at this
+ // point the only difference between child1 and child2 is the TUniqueId.
+ assertEquals(child2.getLocalTupleId(tuple2.getId()).asInt(), 0);
+ assertEquals(child2.getLocalSlotId(t2slot.getId()).asInt(), 0);
+ // Because of the translation, child2's hash is the same as child1.
+ assertEquals(child2.getHashTrace(), child1ExpectedHashTrace);
+ assertEquals(child2.getHashString(), child1.getHashString());
+
+ // Merge the children in opposite order. This means that every index is
different
+ // from its original index in the descriptor table.
+ TupleCacheInfo parent = new TupleCacheInfo(descTbl);
+ parent.mergeChild(child2);
+ parent.mergeChild(child1);
+ parent.finalize();
+
+ // Tuple1 = second index
+ // Tuple2 = first index
+ // Slot1 = second index
+ // Slot2 = first index
+ assertEquals(parent.getLocalTupleId(tuple1.getId()).asInt(), 1);
+ assertEquals(parent.getLocalTupleId(tuple2.getId()).asInt(), 0);
+ assertEquals(parent.getLocalSlotId(t1slot.getId()).asInt(), 1);
+ assertEquals(parent.getLocalSlotId(t2slot.getId()).asInt(), 0);
+ assertEquals(parent.getHashTrace(),
+ child1ExpectedHashTrace + child1ExpectedHashTrace);
+ }
}
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
index 91269e74a..40424f662 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -46,27 +46,43 @@ public class TupleCacheTest extends PlannerTestBase {
* Test some basic cases for tuple cache keys
*/
@Test
- public void testTupleCacheKeys() {
+ public void testBasicCacheKeys() {
verifyIdenticalCacheKeys("select id from functional.alltypes",
"select id from functional.alltypes");
+ // Different table
verifyDifferentCacheKeys("select id from functional.alltypes",
"select id from functional.alltypestiny");
- // TODO: incorporate column information into the key
- // verifyDifferentCacheKeys("select id from functional.alltypes",
- // "select int_col from functional.alltypes");
verifyDifferentCacheKeys("select id from functional.alltypes",
"select id from functional_parquet.alltypes");
+ // Different column
+ verifyDifferentCacheKeys("select id from functional.alltypes",
+ "select int_col from functional.alltypes");
+ // Extra filter
verifyDifferentCacheKeys("select id from functional.alltypes",
"select id from functional.alltypes where id = 1");
+ // Different filter
verifyDifferentCacheKeys("select id from functional.alltypes where id = 1",
"select id from functional.alltypes where id = 2");
+ }
- // TODO: mask out the table alias so it doesn't result in different cache
keys
- // verifyIdenticalCacheKeys("select id from functional.alltypes",
- // "select id from functional.alltypes a");
+ /**
+ * Test cases that rely on masking out unnecessary data to have cache hits.
+ */
+ @Test
+ public void testCacheKeyMasking() {
+ // The table alias is masked out and doesn't result in a different cache
key
+ verifyIdenticalCacheKeys("select id from functional.alltypes",
+ "select id from functional.alltypes a");
+ // The column alias doesn't impact the cache key
verifyIdenticalCacheKeys("select id from functional.alltypes",
"select id as a from functional.alltypes");
+ }
+ /**
+ * Test cases where a statement should be cache ineligible
+ */
+ @Test
+ public void testIneligibility() {
// Tuple caching not implemented for Kudu or HBase
verifyCacheIneligible("select id from functional_kudu.alltypes");
verifyCacheIneligible("select id from functional_hbase.alltypes");
@@ -88,6 +104,53 @@ public class TupleCacheTest extends PlannerTestBase {
// "select date_col from functional.date_tbl where date_col <
current_date()");
}
+ /**
+ * Test cases to verify that pieces of different queries can match
+ * An important piece of this is translation of tuple/slot IDs so that other
parts of
+ * the plan tree don't influence the cache key.
+ */
+ @Test
+ public void testCacheKeyGenerality() {
+ // Build side is the same. Probe side has a different table
+ // (alltypes vs alltypestiny).
+ verifyOverlappingCacheKeys(
+ "select straight_join probe.id from functional.alltypes probe, " +
+ "functional.alltypes build where probe.id = build.id",
+ "select straight_join probe.id from functional.alltypestiny probe, " +
+ "functional.alltypes build where probe.id = build.id");
+
+ // Build side is the same. Probe side has an extra int_col = 100 filter.
+ verifyOverlappingCacheKeys(
+ "select straight_join probe.id from functional.alltypes probe, " +
+ "functional.alltypes build where probe.id = build.id",
+ "select straight_join probe.id from functional.alltypes probe, " +
+ "functional.alltypes build where probe.id = build.id and
probe.int_col=100");
+
+ // Build side is the same. Probe side has an extra int_col = 100 filter.
+ // Note that this test requires id translation, because the probe side's
reference
+ // to int_col takes up a slot id and shifts the slot ids of the build side.
+ // Id translation fixes the build side's int_col slot id so that the probe
side
+ // doesn't influence it.
+ verifyOverlappingCacheKeys(
+ "select straight_join probe.id from functional.alltypes probe, " +
+ "functional.alltypes build where probe.id = build.id and build.int_col
= 100",
+ "select straight_join probe.id from functional.alltypes probe, " +
+ "functional.alltypes build where probe.id = build.id and probe.int_col
= 100 " +
+ "and build.int_col = 100");
+
+ // This has one build side that is the same, but one query has an extra
join feeding
+ // the probe side. The extra join takes up an extra tuple id, shifting the
tuple id
+ // for the overlapping build. This test requires translation of the tuple
ids to
+ // work. It also relies on masking out the node ids, because the node id
will also be
+ // different.
+ verifyOverlappingCacheKeys(
+ "select straight_join probe1.id from functional.alltypes probe1, " +
+ "functional.alltypes probe2, functional.alltypes build " +
+ "where probe1.id = probe2.id and probe2.id = build.id",
+ "select straight_join probe1.id from functional.alltypes probe1, " +
+ "functional.alltypes build where probe1.id = build.id");
+ }
+
protected List<PlanNode> getCacheEligibleNodes(String query) {
List<PlanFragment> plan = getPlan(query);
PlanNode planRoot = plan.get(0).getPlanRoot();
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
b/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
index cb006613b..daafd1a3d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
@@ -60,5 +60,5 @@ set ENABLE_TUPLE_CACHE=TRUE;
explain select count(*) from tpch.region
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* cache key: [0-9a-f][0-9a-f]*.*
-row_regex:.*\[TPlanNode\(.*\]
+row_regex:.*\[.*TPlanNode\(.*\]
====
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
index 8b6024608..2c62d965e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
@@ -100,5 +100,5 @@ set ENABLE_TUPLE_CACHE=TRUE;
explain select count(*) from tpch.region
---- RESULTS: VERIFY_IS_SUBSET
row_regex:.* cache key: [0-9a-f][0-9a-f]*.*
-row_regex:.*\[TPlanNode\(.*\]
+row_regex:.*\[.*TPlanNode\(.*\]
====