This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new f222574f0 IMPALA-13660: Support caching broadcast hash joins
f222574f0 is described below
commit f222574f04fc7b94e1ad514d7d720d50d036a226
Author: Michael Smith <[email protected]>
AuthorDate: Wed Jan 22 16:38:13 2025 -0800
IMPALA-13660: Support caching broadcast hash joins
This extends tuple caching to be able to cache above joins. As part
of this, ExchangeNodes are now eligible for broadcast and directed
exchanges. This does not yet support partitioned exchanges. Since
an exchange passes data from all nodes, this incorporates all the
scan range information when passing through an exchange.
For joins with a separate build side, a cache hit above the join
means that a probe-side thread will never arrive. If the builder
is not notified, it will wait for that thread to arrive and extend
the latency of the query significantly. This adds code to notify
the builder when a thread will never participate in the probe
phase.
Testing:
- Added test cases to TestTupleCace including with distributed
plans.
- Added test cases to test_tuple_cache.py to verify behavior when
updating the build side table and the timing of a cache hit.
- Performance tests with TPC-DS at scale
Change-Id: Ic61462702b43175c593b34e8c3a14b9cfe85c29e
Reviewed-on: http://gerrit.cloudera.org:8080/22371
Reviewed-by: Joe McDonnell <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/exec/blocking-join-node.cc | 40 ++-
be/src/exec/blocking-join-node.h | 10 +
be/src/exec/join-builder.cc | 33 ++-
be/src/exec/join-builder.h | 10 +
be/src/exec/nested-loop-join-node.cc | 23 +-
be/src/exec/partitioned-hash-join-node.cc | 21 +-
be/src/exec/tuple-cache-node.cc | 4 +
.../impala/common/ThriftSerializationCtx.java | 11 +
.../org/apache/impala/planner/ExchangeNode.java | 31 +-
.../org/apache/impala/planner/HashJoinNode.java | 32 +--
.../apache/impala/planner/IcebergDeleteNode.java | 24 +-
.../org/apache/impala/planner/JoinBuildSink.java | 10 +-
.../java/org/apache/impala/planner/JoinNode.java | 37 ++-
.../apache/impala/planner/NestedLoopJoinNode.java | 7 +-
.../java/org/apache/impala/planner/PlanNode.java | 8 +
.../org/apache/impala/planner/TupleCacheInfo.java | 65 ++++-
.../apache/impala/planner/TupleCachePlanner.java | 6 +
.../org/apache/impala/planner/TupleCacheTest.java | 318 +++++++++++++++------
tests/custom_cluster/test_tuple_cache.py | 95 +++++-
19 files changed, 613 insertions(+), 172 deletions(-)
diff --git a/be/src/exec/blocking-join-node.cc
b/be/src/exec/blocking-join-node.cc
index d08a507a3..043fdc9c4 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -181,6 +181,31 @@ void BlockingJoinNode::ProcessBuildInputAsync(
state->resource_pool()->ReleaseThreadToken(false);
}
+Status BlockingJoinNode::LookupSeparateJoinBuilder(RuntimeState* state,
+ JoinBuilder** separate_builder) {
+ // This can only be called when there is a separate join build
+ DCHECK(UseSeparateBuild(state->query_options()));
+
+ // Find the input fragment's build sink. GetFInstanceState() waits for the
Prepare()
+ // phase to complete for all the fragments on this executor, so it is
unsuitable to
+ // call during Prepare() itself. Typically, this is used during the Open()
phase.
+ const google::protobuf::RepeatedPtrField<JoinBuildInputPB>& build_inputs =
+ state->instance_ctx_pb().join_build_inputs();
+ auto it = std::find_if(build_inputs.begin(), build_inputs.end(),
+ [this](const JoinBuildInputPB& bi) { return bi.join_node_id() == id_; });
+ DCHECK(it != build_inputs.end());
+ FragmentInstanceState* build_finstance;
+ TUniqueId input_finstance_id;
+ UniqueIdPBToTUniqueId(it->input_finstance_id(), &input_finstance_id);
+ RETURN_IF_ERROR(
+ state->query_state()->GetFInstanceState(input_finstance_id,
&build_finstance));
+ TDataSinkType::type build_sink_type =
build_finstance->fragment().output_sink.type;
+ DCHECK(IsJoinBuildSink(build_sink_type));
+ *separate_builder = build_finstance->GetJoinBuildSink();
+ DCHECK(*separate_builder != nullptr);
+ return Status::OK();
+}
+
Status BlockingJoinNode::OpenImpl(RuntimeState* state, JoinBuilder**
separate_builder) {
RETURN_IF_ERROR(ExecNode::Open(state));
eos_ = false;
@@ -191,20 +216,7 @@ Status BlockingJoinNode::OpenImpl(RuntimeState* state,
JoinBuilder** separate_bu
if (UseSeparateBuild(state->query_options())) {
// Find the input fragment's build sink. We do this in the Open() phase so
we don't
// block this finstance's Prepare() phase on the build finstance's
Prepare() phase.
- const google::protobuf::RepeatedPtrField<JoinBuildInputPB>& build_inputs =
- state->instance_ctx_pb().join_build_inputs();
- auto it = std::find_if(build_inputs.begin(), build_inputs.end(),
- [this](const JoinBuildInputPB& bi) { return bi.join_node_id() == id_;
});
- DCHECK(it != build_inputs.end());
- FragmentInstanceState* build_finstance;
- TUniqueId input_finstance_id;
- UniqueIdPBToTUniqueId(it->input_finstance_id(), &input_finstance_id);
- RETURN_IF_ERROR(
- state->query_state()->GetFInstanceState(input_finstance_id,
&build_finstance));
- TDataSinkType::type build_sink_type =
build_finstance->fragment().output_sink.type;
- DCHECK(IsJoinBuildSink(build_sink_type));
- *separate_builder = build_finstance->GetJoinBuildSink();
- DCHECK(*separate_builder != nullptr);
+ RETURN_IF_ERROR(LookupSeparateJoinBuilder(state, separate_builder));
} else {
// The integrated join build requires some tricky time accounting because
two
// threads execute concurrently with the time from the left and right child
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 66856dc0e..3ddfedf35 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -116,6 +116,10 @@ class BlockingJoinNode : public ExecNode {
/// on the builder.
bool waited_for_build_ = false;
+ /// True if Prepare() exited successfully. This is used to avoid deadlocks
where
+ /// a thread calls WaitForPrepare() without having successfully finished
Prepare().
+ bool prepare_succeeded_ = false;
+
/// Store in node to avoid reallocating. Cleared after build completes.
boost::scoped_ptr<RowBatch> build_batch_;
@@ -248,6 +252,12 @@ class BlockingJoinNode : public ExecNode {
return plan_node().UseSeparateBuild(query_options);
}
+ /// Find the build sink for the separate builder fragment. This is only
valid when
+ /// there is a separate builder fragment. Currently, this cannot be used
during the
+ /// Prepare() phase, because it uses QueryState::WaitForPrepare(), which
waits for
+ /// Prepare() to finish for all fragment instances.
+ Status LookupSeparateJoinBuilder(RuntimeState* state, JoinBuilder**
separate_builder);
+
const RowDescriptor& probe_row_desc() const {
return plan_node().probe_row_desc();
}
diff --git a/be/src/exec/join-builder.cc b/be/src/exec/join-builder.cc
index b2ab1d916..a1ee2c460 100644
--- a/be/src/exec/join-builder.cc
+++ b/be/src/exec/join-builder.cc
@@ -41,7 +41,10 @@ JoinBuilder::JoinBuilder(TDataSinkId sink_id, const
JoinBuilderConfig& sink_conf
join_op_(sink_config.join_op_),
is_separate_build_(sink_id != -1),
num_probe_threads_(
- is_separate_build_ ? state->instance_ctx().num_join_build_outputs : 1)
{}
+ is_separate_build_ ? state->instance_ctx().num_join_build_outputs : 1),
+ outstanding_probes_(num_probe_threads_) {
+ DCHECK_GE(outstanding_probes_, 1);
+}
JoinBuilder::~JoinBuilder() {
DCHECK_EQ(0, probe_refcount_);
@@ -55,7 +58,7 @@ void JoinBuilder::CloseFromProbe(RuntimeState*
join_node_state) {
--probe_refcount_;
last_probe = probe_refcount_ == 0;
VLOG(3) << "JoinBuilder (id=" << join_node_id_ << ")"
- << "closed from finstance "
+ << "closed from probe from finstance "
<< PrintId(join_node_state->fragment_instance_id())
<< "probe_refcount_=" << probe_refcount_;
DCHECK_GE(probe_refcount_, 0);
@@ -67,6 +70,30 @@ void JoinBuilder::CloseFromProbe(RuntimeState*
join_node_state) {
}
}
+void JoinBuilder::CloseBeforeProbe(RuntimeState* join_node_state) {
+ if (is_separate_build_) {
+ bool no_threads_remaining;
+ {
+ unique_lock<mutex> l(separate_build_lock_);
+ --outstanding_probes_;
+ // If all the probes are done (probe_refcount_ == 0) and there are no
+ // futher probes expected (outstanding_probes_ == 0), then there are
+ // no threads left.
+ no_threads_remaining = (probe_refcount_ == 0 && outstanding_probes_ ==
0);
+ VLOG(3) << "JoinBuilder (id=" << join_node_id_ << ")"
+ << "closed before probe from finstance "
+ << PrintId(join_node_state->fragment_instance_id())
+ << "outstanding_probes_=" << outstanding_probes_ << " "
+ << "probe_refcount_=" << probe_refcount_;
+ DCHECK_GE(outstanding_probes_, 0);
+ }
+ // Only notify when there are no threads remaining
+ if (no_threads_remaining) build_wakeup_cv_.NotifyAll();
+ } else {
+ Close(join_node_state);
+ }
+}
+
Status JoinBuilder::WaitForInitialBuild(RuntimeState* join_node_state) {
DCHECK(is_separate_build_);
join_node_state->AddCancellationCV(&separate_build_lock_, &probe_wakeup_cv_);
@@ -103,8 +130,6 @@ void JoinBuilder::HandoffToProbesAndWait(RuntimeState*
build_side_state) {
{
unique_lock<mutex> l(separate_build_lock_);
ready_to_probe_ = true;
- outstanding_probes_ = num_probe_threads_;
- DCHECK_GE(outstanding_probes_, 1);
VLOG(3) << "JoinBuilder (id=" << join_node_id_ << ")"
<< " waiting for " << outstanding_probes_ << " probes.";
probe_wakeup_cv_.NotifyAll();
diff --git a/be/src/exec/join-builder.h b/be/src/exec/join-builder.h
index aaedb9ba7..a4e585caf 100644
--- a/be/src/exec/join-builder.h
+++ b/be/src/exec/join-builder.h
@@ -142,6 +142,16 @@ class JoinBuilder : public DataSink {
/// this builder calls CloseFromProbe(). BlockingJoinNode never calls
Close() directly.
void CloseFromProbe(RuntimeState* join_node_state);
+ /// This is called from BlockingJoinNode to signal that the node won't even
reach the
+ /// probe phase. This can happen if the node is closed before calling
Open(). This can
+ /// happen if the BlockingJoinNode is a child of a TupleCacheNode with a
cache hit.
+ /// This only makes sense when using a separate join build. Each
BlockingJoinNode needs
+ /// to call CloseBeforeProbe() if it won't call WaitForInitialBuild(). This
adjusts the
+ /// number of outstanding probes so that the builder doesn't wait
unnecessarily for
+ /// probes that will never show up. If there are no threads left, this
notifies the
+ /// builder.
+ void CloseBeforeProbe(RuntimeState* join_node_state);
+
int num_probe_threads() const { return num_probe_threads_; }
static string ConstructBuilderName(const char* name, int join_node_id) {
diff --git a/be/src/exec/nested-loop-join-node.cc
b/be/src/exec/nested-loop-join-node.cc
index 79ee614be..b73ebbaf3 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -150,6 +150,7 @@ Status NestedLoopJoinNode::Prepare(RuntimeState* state) {
matching_build_rows_.reset(new Bitmap(0));
}
}
+ prepare_succeeded_ = true;
return Status::OK();
}
@@ -166,12 +167,28 @@ Status NestedLoopJoinNode::Reset(RuntimeState* state,
RowBatch* row_batch) {
void NestedLoopJoinNode::Close(RuntimeState* state) {
if (is_closed()) return;
ScalarExprEvaluator::Close(join_conjunct_evals_, state);
+ bool separate_build = UseSeparateBuild(state->query_options());
+ if (prepare_succeeded_ && builder_ == nullptr && separate_build) {
+ DCHECK(builder_ == nullptr);
+ DCHECK(!waited_for_build_);
+ // Find the separate join builder. This can return an error status if the
Prepare()
+ // phase failed. In that case, there is no need to notify the builder and
it can
+ // be skipped.
+ JoinBuilder* separate_builder;
+ Status status = LookupSeparateJoinBuilder(state, &separate_builder);
+ if (status.ok()) {
+ builder_ = dynamic_cast<NljBuilder*>(separate_builder);
+ DCHECK(builder_ != nullptr);
+ }
+ }
if (builder_ != NULL) {
// IMPALA-6595: builder must be closed before child. The separate build
case is
// handled in FragmentInstanceState.
- DCHECK(UseSeparateBuild(state->query_options()) || builder_->is_closed()
- || !children_[1]->is_closed());
- if (!UseSeparateBuild(state->query_options()) || waited_for_build_) {
+ DCHECK(separate_build || builder_->is_closed() ||
!children_[1]->is_closed());
+ if (separate_build && !waited_for_build_) {
+ // There is a separate build and we never reached the probe phase
+ builder_->CloseBeforeProbe(state);
+ } else {
builder_->CloseFromProbe(state);
waited_for_build_ = false;
}
diff --git a/be/src/exec/partitioned-hash-join-node.cc
b/be/src/exec/partitioned-hash-join-node.cc
index b3e594f1d..238e0e2e2 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -161,6 +161,7 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState*
state) {
num_probe_rows_partitioned_ =
ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
+ prepare_succeeded_ = true;
return Status::OK();
}
@@ -298,9 +299,25 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
if (build_batch_ != nullptr) build_batch_->Reset();
if (probe_batch_ != nullptr) probe_batch_->Reset();
CloseAndDeletePartitions(nullptr);
+ bool separate_build = UseSeparateBuild(state->query_options());
+ if (prepare_succeeded_ && builder_ == nullptr && separate_build) {
+ DCHECK(builder_ == nullptr);
+ DCHECK(!waited_for_build_);
+ // Find the separate join builder. This can return an error status if the
Prepare()
+ // phase failed. In that case, there is no need to notify the builder and
it can
+ // be skipped.
+ JoinBuilder* separate_builder;
+ Status status = LookupSeparateJoinBuilder(state, &separate_builder);
+ if (status.ok()) {
+ builder_ = dynamic_cast<PhjBuilder*>(separate_builder);
+ DCHECK(builder_ != nullptr);
+ }
+ }
if (builder_ != nullptr) {
- bool separate_build = UseSeparateBuild(state->query_options());
- if (!separate_build || waited_for_build_) {
+ if (separate_build && !waited_for_build_) {
+ // There is a separate build and we never reached the probe phase
+ builder_->CloseBeforeProbe(state);
+ } else {
if (separate_build
&& buffer_pool_client()->GetReservation() >
resource_profile_.min_reservation) {
// Transfer back surplus reservation, which we may have borrowed from
'builder_'.
diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc
index 3ddee4831..f2ade640d 100644
--- a/be/src/exec/tuple-cache-node.cc
+++ b/be/src/exec/tuple-cache-node.cc
@@ -254,6 +254,10 @@ Status TupleCacheNode::GetNext(
Status status = reader_->GetNext(state, buffer_pool_client(),
output_row_batch, eos);
if (status.ok()) {
cached_rowbatch_returned_to_caller_ = true;
+ // Close the child now that there is no hope of recovery in case of
failure. This
+ // allows it to tear down any state and notify any affected threads that
the
+ // children won't ever reach Open().
+ child(0)->Close(state);
} else {
// If we have returned a cached row batch to the caller, then it is not
safe
// to try to get any rows from the child as they could be duplicates. Any
diff --git
a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
index 2c69701ec..f980b363e 100644
--- a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
+++ b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
@@ -129,4 +129,15 @@ public class ThriftSerializationCtx {
return globalId;
}
}
+
+ /**
+ * Incorporate all the scan range information from the children nodes into
the
+ * compile-time cache key. This eliminates the need to include these scan
ranges
+ * in the runtime key.
+ */
+ public void incorporateScans() {
+ if (isTupleCache()) {
+ tupleCacheInfo_.incorporateScans();
+ }
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index 9f24c5809..11971e151 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -24,6 +24,8 @@ import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.SortInfo;
import org.apache.impala.analysis.TupleId;
import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.ThriftSerializationCtx;
+import org.apache.impala.planner.TupleCacheInfo.IneligibilityReason;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TDataSinkType;
import org.apache.impala.thrift.TExchangeNode;
@@ -362,6 +364,22 @@ public class ExchangeNode extends PlanNode {
@Override
protected void toThrift(TPlanNode msg) {
+ Preconditions.checkState(false, "Unexpected use of old toThrift()
signature.");
+ }
+
+ @Override
+ protected void toThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) {
+ if (isMergingExchange()) {
+ LOG.trace("{} ineligible for caching because it is a merging exchange",
this);
+
serialCtx.setTupleCachingIneligible(IneligibilityReason.MERGING_EXCHANGE);
+ } else if (!isDirectedExchange() && !isBroadcastExchange()) {
+ LOG.trace("{} ineligible for caching because it is a partitioned
exchange", this);
+
serialCtx.setTupleCachingIneligible(IneligibilityReason.PARTITIONED_EXCHANGE);
+ }
+ // Data that arrives via an exchange could come from any other node, so it
could come
+ // from any scan range that feeds into this regardless of node.
Incorporate all the
+ // current scan ranges into the cache key.
+ serialCtx.incorporateScans();
if (processingCost_.isValid() && processingCost_ instanceof
BroadcastProcessingCost) {
Preconditions.checkState(
getNumReceivers() == processingCost_.getNumInstancesExpected());
@@ -369,13 +387,14 @@ public class ExchangeNode extends PlanNode {
msg.node_type = TPlanNodeType.EXCHANGE_NODE;
msg.exchange_node = new TExchangeNode();
for (TupleId tid: tupleIds_) {
- msg.exchange_node.addToInput_row_tuples(tid.asInt());
+
msg.exchange_node.addToInput_row_tuples(serialCtx.translateTupleId(tid).asInt());
}
if (isMergingExchange()) {
TSortInfo sortInfo = new TSortInfo(
- Expr.treesToThrift(mergeInfo_.getSortExprs()),
mergeInfo_.getIsAscOrder(),
- mergeInfo_.getNullsFirst(), mergeInfo_.getSortingOrder());
+ Expr.treesToThrift(mergeInfo_.getSortExprs(), serialCtx),
+ mergeInfo_.getIsAscOrder(), mergeInfo_.getNullsFirst(),
+ mergeInfo_.getSortingOrder());
msg.exchange_node.setSort_info(sortInfo);
msg.exchange_node.setOffset(offset_);
}
@@ -387,4 +406,10 @@ public class ExchangeNode extends PlanNode {
if (!nodeStack.isEmpty()) nodeStack.add(this);
getChild(0).reduceCardinalityByRuntimeFilter(nodeStack, reductionScale);
}
+
+ @Override
+ public boolean isTupleCachingImplemented() { return true; }
+
+ @Override
+ public boolean omitTupleCache() { return true; }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index dc418fc58..3b054b71e 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -30,7 +30,7 @@ import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.Pair;
-import org.apache.impala.thrift.TEqJoinCondition;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.THashJoinNode;
import org.apache.impala.thrift.TPlanNode;
@@ -146,32 +146,20 @@ public class HashJoinNode extends JoinNode implements
SpillableOperator {
}
@Override
- protected void toThrift(TPlanNode msg) {
+ protected void toThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) {
msg.node_type = TPlanNodeType.HASH_JOIN_NODE;
- msg.join_node = joinNodeToThrift();
+ msg.join_node = joinNodeToThrift(serialCtx);
msg.join_node.hash_join_node = new THashJoinNode();
-
msg.join_node.hash_join_node.setEq_join_conjuncts(getThriftEquiJoinConjuncts());
+ msg.join_node.hash_join_node.setEq_join_conjuncts(
+ getThriftEquiJoinConjuncts(serialCtx));
for (Expr e: otherJoinConjuncts_) {
- msg.join_node.hash_join_node.addToOther_join_conjuncts(e.treeToThrift());
+
msg.join_node.hash_join_node.addToOther_join_conjuncts(e.treeToThrift(serialCtx));
}
- msg.join_node.hash_join_node.setHash_seed(getFragment().getHashSeed());
- }
-
- /**
- * Helper to get the equi-join conjuncts in a thrift representation.
- */
- public List<TEqJoinCondition> getThriftEquiJoinConjuncts() {
- List<TEqJoinCondition> equiJoinConjuncts = new
ArrayList<>(eqJoinConjuncts_.size());
- for (Expr entry : eqJoinConjuncts_) {
- BinaryPredicate bp = (BinaryPredicate)entry;
- TEqJoinCondition eqJoinCondition =
- new TEqJoinCondition(bp.getChild(0).treeToThrift(),
- bp.getChild(1).treeToThrift(),
- bp.getOp() == BinaryPredicate.Operator.NOT_DISTINCT);
-
- equiJoinConjuncts.add(eqJoinCondition);
+ // Omit hash seed for tuple caching to improve generalizability. Hash seed
is only
+ // used by partitioned builds, which are currently excluded from tuple
caching.
+ if (!serialCtx.isTupleCache()) {
+ msg.join_node.hash_join_node.setHash_seed(getFragment().getHashSeed());
}
- return equiJoinConjuncts;
}
@Override
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
index 591a1c9be..30877e5b0 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
@@ -29,7 +29,7 @@ import org.apache.impala.analysis.JoinOperator;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
-import org.apache.impala.thrift.TEqJoinCondition;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TIcebergDeleteNode;
import org.apache.impala.thrift.TPlanNode;
@@ -120,26 +120,12 @@ public class IcebergDeleteNode extends JoinNode {
}
@Override
- protected void toThrift(TPlanNode msg) {
+ protected void toThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) {
msg.node_type = TPlanNodeType.ICEBERG_DELETE_NODE;
- msg.join_node = joinNodeToThrift();
+ msg.join_node = joinNodeToThrift(serialCtx);
msg.join_node.iceberg_delete_node = new TIcebergDeleteNode();
-
msg.join_node.iceberg_delete_node.setEq_join_conjuncts(getThriftEquiJoinConjuncts());
- }
-
- /**
- * Helper to get the equi-join conjuncts in a thrift representation.
- */
- public List<TEqJoinCondition> getThriftEquiJoinConjuncts() {
- List<TEqJoinCondition> equiJoinConjuncts = new
ArrayList<>(eqJoinConjuncts_.size());
- for (BinaryPredicate bp : eqJoinConjuncts_) {
- TEqJoinCondition eqJoinCondition = new TEqJoinCondition(
- bp.getChild(0).treeToThrift(), bp.getChild(1).treeToThrift(),
- bp.getOp() == BinaryPredicate.Operator.NOT_DISTINCT);
-
- equiJoinConjuncts.add(eqJoinCondition);
- }
- return equiJoinConjuncts;
+ msg.join_node.iceberg_delete_node.setEq_join_conjuncts(
+ getThriftEquiJoinConjuncts(serialCtx));
}
@Override
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
index 4136dfda7..227ae4919 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
@@ -76,14 +76,10 @@ public class JoinBuildSink extends DataSink implements
SpillableOperator {
TJoinBuildSink tBuildSink = new TJoinBuildSink();
tBuildSink.setDest_node_id(joinNode_.getId().asInt());
tBuildSink.setJoin_op(joinNode_.getJoinOp().toThrift());
- if (joinNode_ instanceof HashJoinNode) {
- tBuildSink.setEq_join_conjuncts(
- ((HashJoinNode)joinNode_).getThriftEquiJoinConjuncts());
- tBuildSink.setHash_seed(joinNode_.getFragment().getHashSeed());
- }
- if (joinNode_ instanceof IcebergDeleteNode) {
+ if (joinNode_ instanceof HashJoinNode ||
+ joinNode_ instanceof IcebergDeleteNode) {
tBuildSink.setEq_join_conjuncts(
- ((IcebergDeleteNode) joinNode_).getThriftEquiJoinConjuncts());
+ joinNode_.getThriftEquiJoinConjuncts(new ThriftSerializationCtx()));
tBuildSink.setHash_seed(joinNode_.getFragment().getHashSeed());
}
for (RuntimeFilter filter : runtimeFilters_) {
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index 2c7ccd36e..86597b345 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -38,10 +38,14 @@ import org.apache.impala.catalog.ColumnStats;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
+import org.apache.impala.common.ThriftSerializationCtx;
+import org.apache.impala.planner.TupleCacheInfo.IneligibilityReason;
+import org.apache.impala.thrift.TEqJoinCondition;
import org.apache.impala.thrift.TExecNodePhase;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TJoinDistributionMode;
import org.apache.impala.thrift.TJoinNode;
+import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TQueryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -983,19 +987,44 @@ public abstract class JoinNode extends PlanNode {
}
}
+ @Override
+ protected void toThrift(TPlanNode msg) {
+ Preconditions.checkState(false, "Unexpected use of old toThrift()
signature.");
+ }
+
/** Helper to construct TJoinNode. */
- protected TJoinNode joinNodeToThrift() {
+ protected TJoinNode joinNodeToThrift(ThriftSerializationCtx serialCtx) {
+ if (distrMode_ == DistributionMode.PARTITIONED) {
+ LOG.trace("{} ineligible for caching because it is a partitioned
exchange", this);
+
serialCtx.setTupleCachingIneligible(IneligibilityReason.PARTITIONED_EXCHANGE);
+ }
TJoinNode result = new TJoinNode(joinOp_.toThrift());
List<TupleId> buildTupleIds = getChild(1).getTupleIds();
result.setBuild_tuples(new ArrayList<>(buildTupleIds.size()));
result.setNullable_build_tuples(new ArrayList<>(buildTupleIds.size()));
for (TupleId tid : buildTupleIds) {
- result.addToBuild_tuples(tid.asInt());
+ result.addToBuild_tuples(serialCtx.translateTupleId(tid).asInt());
result.addToNullable_build_tuples(getChild(1).getNullableTupleIds().contains(tid));
}
return result;
}
+ /**
+ * Helper to get the equi-join conjuncts in a thrift representation.
+ */
+ public List<TEqJoinCondition> getThriftEquiJoinConjuncts(
+ ThriftSerializationCtx serialCtx) {
+ List<TEqJoinCondition> equiJoinConjuncts = new
ArrayList<>(eqJoinConjuncts_.size());
+ for (BinaryPredicate bp : eqJoinConjuncts_) {
+ TEqJoinCondition eqJoinCondition = new TEqJoinCondition(
+ bp.getChild(0).treeToThrift(serialCtx),
bp.getChild(1).treeToThrift(serialCtx),
+ bp.getOp() == BinaryPredicate.Operator.NOT_DISTINCT);
+
+ equiJoinConjuncts.add(eqJoinCondition);
+ }
+ return equiJoinConjuncts;
+ }
+
@Override
public void computeProcessingCost(TQueryOptions queryOptions) {
Pair<ProcessingCost, ProcessingCost> probeBuildCost =
computeJoinProcessingCost();
@@ -1078,4 +1107,8 @@ public abstract class JoinNode extends PlanNode {
double selectivity = RuntimeFilterGenerator.getJoinNodeSelectivity(this);
return 0 <= selectivity && selectivity <= 1;
}
+
+
+ @Override
+ public boolean isTupleCachingImplemented() { return true; }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
index 4bad6aee0..c15ad2054 100644
--- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
@@ -26,6 +26,7 @@ import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.JoinOperator;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
+import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TNestedLoopJoinNode;
import org.apache.impala.thrift.TPlanNode;
@@ -164,12 +165,12 @@ public class NestedLoopJoinNode extends JoinNode {
}
@Override
- protected void toThrift(TPlanNode msg) {
+ protected void toThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) {
msg.node_type = TPlanNodeType.NESTED_LOOP_JOIN_NODE;
- msg.join_node = joinNodeToThrift();
+ msg.join_node = joinNodeToThrift(serialCtx);
msg.join_node.nested_loop_join_node = new TNestedLoopJoinNode();
for (Expr e : otherJoinConjuncts_) {
-
msg.join_node.nested_loop_join_node.addToJoin_conjuncts(e.treeToThrift());
+
msg.join_node.nested_loop_join_node.addToJoin_conjuncts(e.treeToThrift(serialCtx));
}
}
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 4448eaa21..0da260c60 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -1405,6 +1405,14 @@ abstract public class PlanNode extends
TreeNode<PlanNode> {
*/
public boolean isTupleCachingImplemented() { return false; }
+ /**
+ * Whether a tuple cache node should be omitted immediately above this node
(e.g.
+ * because it is not a beneficial caching location). This has no impact on
eligibility,
+ * so tuple cache nodes can be placed higher in the plan. PlanNodes should
override
+ * this to return true if this is an unfavorable caching location.
+ */
+ public boolean omitTupleCache() { return false; }
+
/**
* Return the least between 'cardinality1' and 'cardinality2'
* that is not a negative number (unknown).
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
index 2c8037ae7..4ff52e61c 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
@@ -22,6 +22,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.StringJoiner;
import java.util.TreeMap;
import org.apache.impala.analysis.DescriptorTable;
@@ -100,6 +101,8 @@ public class TupleCacheInfo {
// limits on a sorted input).
LIMIT,
NONDETERMINISTIC_FN,
+ MERGING_EXCHANGE,
+ PARTITIONED_EXCHANGE,
}
private EnumSet<IneligibilityReason> ineligibilityReasons_;
@@ -198,22 +201,33 @@ public class TupleCacheInfo {
}
/**
- * Pull in a child's TupleCacheInfo into this TupleCacheInfo. If the child is
- * ineligible, then this is marked ineligible and there is no need to
calculate
- * a hash. If the child is eligible, it incorporates the child's hash into
this
- * hash. Returns true if the child was merged, false if it was ineligible.
- *
- * Resolves scan ranges statically for cases where results depend on all
scan ranges.
+ * Pull in a child's TupleCacheInfo into this TupleCacheInfo while also
incorporating
+ * all of its scan ranges into the key. This returns true if the child is
eligible
+ * and false otherwise.
*/
public boolean mergeChildWithScans(TupleCacheInfo child) {
- if (!mergeChildImpl(child)) {
- return false;
+ if (!child.isEligible()) {
+ return mergeChild(child);
}
+ // Use a temporary TupleCacheInfo to incorporate the scan ranges for this
child.
+ TupleCacheInfo tmpInfo = new TupleCacheInfo(descriptorTable_);
+ boolean success = tmpInfo.mergeChild(child);
+ Preconditions.checkState(success);
+ tmpInfo.incorporateScans();
+ tmpInfo.finalizeHash();
+ return mergeChild(tmpInfo);
+ }
+ /**
+ * Incorporate all the scan range information from input scan nodes into the
+ * cache key. This clears the lists of input scan nodes, as the information
is
+ * now built into the cache key.
+ */
+ public void incorporateScans() {
// Add all scan range specs to the hash. Copy only the relevant fields,
primarily:
// filename, mtime, size, and offset. Others like partition_id may change
after
// reloading metadata.
- for (HdfsScanNode scanNode: child.inputScanNodes_) {
+ for (HdfsScanNode scanNode: inputScanNodes_) {
TScanRangeSpec orig = scanNode.getScanRangeSpecs();
TScanRangeSpec spec = new TScanRangeSpec();
if (orig.isSetConcrete_ranges()) {
@@ -243,7 +257,9 @@ public class TupleCacheInfo {
}
hashThrift(spec);
}
- return true;
+ // The scan ranges have been incorporated into the key and are no longer
needed
+ // at runtime.
+ inputScanNodes_.clear();
}
/**
@@ -418,4 +434,33 @@ public class TupleCacheInfo {
Preconditions.checkState(slotTranslationMap_.containsKey(globalId));
return slotTranslationMap_.get(globalId);
}
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("TupleCacheInfo:");
+ if (isEligible()) {
+ builder.append("cache key: ");
+ builder.append(getHashString());
+ builder.append("\n");
+ builder.append("cache key hash trace: ");
+ builder.append(getHashTrace());
+ builder.append("\n");
+ } else {
+ builder.append("ineligibility reasons: ");
+ builder.append(getIneligibilityReasonsString());
+ builder.append("\n");
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Construct a comma separated list of the ineligibility reasons.
+ */
+ public String getIneligibilityReasonsString() {
+ StringJoiner joiner = new StringJoiner(",");
+ for (IneligibilityReason reason : ineligibilityReasons_) {
+ joiner.add(reason.toString());
+ }
+ return joiner.toString();
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
b/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
index e5c5f894a..17b26fa81 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
@@ -81,6 +81,12 @@ public class TupleCachePlanner {
return node;
}
+ // If node omits tuple cache placement - such as Exchange and Union nodes,
where it
+ // would not be beneficial - skip it.
+ if (node.omitTupleCache()) {
+ return node;
+ }
+
// Should we cache above this node?
// Simplest policy: always cache if eligible
// TODO: Make this more complicated (e.g. cost calculations)
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
index 594ac5c88..7cbf01b59 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -75,43 +76,48 @@ public class TupleCacheTest extends PlannerTestBase {
@Test
public void testRuntimeFilterCacheKeys() {
- String basicJoinTmpl = "select straight_join probe.id from
functional.alltypes " +
- "probe, functional.alltypestiny build where %s";
- verifyIdenticalCacheKeys(
- String.format(basicJoinTmpl, "probe.id = build.id"),
- String.format(basicJoinTmpl, "probe.id = build.id"));
- // Trivial rewrite produces same plan
- verifyIdenticalCacheKeys(
- String.format(basicJoinTmpl, "probe.id = build.id"),
- String.format(basicJoinTmpl, "build.id = probe.id"));
- // Larger join with same subquery. Cache keys match because cache is
disabled when
- // the build side is too complex.
- verifyIdenticalCacheKeys(
- String.format(basicJoinTmpl, "probe.id = build.id"),
- "select straight_join p.id from functional.alltypes p, (" +
- String.format(basicJoinTmpl, "probe.id = build.id") + ") b where p.id
= b.id");
- // Different filter slot
- verifyOverlappingCacheKeys(
- String.format(basicJoinTmpl, "probe.id = build.id"),
- String.format(basicJoinTmpl, "probe.id + 1 = build.id"));
- // Different target expression
- verifyOverlappingCacheKeys(
- String.format(basicJoinTmpl, "probe.id + 1 = build.id"),
- String.format(basicJoinTmpl, "probe.id + 2 = build.id"));
- // Larger join with similar subquery and simpler plan tree.
- verifyOverlappingCacheKeys(
- String.format(basicJoinTmpl, "probe.id = build.id"),
- "select straight_join a.id from functional.alltypes a,
functional.alltypes b, " +
- "functional.alltypestiny c where a.id = b.id and b.id = c.id");
- // Different build-side source table
- verifyDifferentCacheKeys(
- String.format(basicJoinTmpl, "probe.id = build.id"),
- "select straight_join a.id from functional.alltypes a,
functional.alltypes b " +
- "where a.id = b.id");
- // Different build-side predicates
- verifyDifferentCacheKeys(
- String.format(basicJoinTmpl, "probe.id = build.id"),
- String.format(basicJoinTmpl, "probe.id = build.id and build.id <
100"));
+ for (boolean isDistributedPlan : Arrays.asList(false, true)) {
+ String basicJoinTmpl = "select straight_join probe.id from
functional.alltypes " +
+ "probe, functional.alltypestiny build where %s";
+ verifyIdenticalCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ String.format(basicJoinTmpl, "probe.id = build.id"),
isDistributedPlan);
+ // Trivial rewrite produces same plan
+ verifyIdenticalCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ String.format(basicJoinTmpl, "build.id = probe.id"),
isDistributedPlan);
+ // Larger join with same subquery. Cache keys match for the subquery,
but don't
+ // match for the outer query.
+ verifyOverlappingCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ "select straight_join p.id from functional.alltypes p, (" +
+ String.format(basicJoinTmpl, "probe.id = build.id") + ") b where
p.id = b.id",
+ isDistributedPlan);
+ // Different filter slot
+ verifyOverlappingCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ String.format(basicJoinTmpl, "probe.id + 1 = build.id"),
isDistributedPlan);
+ // Different target expression
+ verifyOverlappingCacheKeys(
+ String.format(basicJoinTmpl, "probe.id + 1 = build.id"),
+ String.format(basicJoinTmpl, "probe.id + 2 = build.id"),
isDistributedPlan);
+ // Larger join with similar subquery and simpler plan tree.
+ verifyOverlappingCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ "select straight_join a.id from functional.alltypes a, " +
+ "functional.alltypes b, functional.alltypestiny c where a.id = b.id
" +
+ "and b.id = c.id", isDistributedPlan);
+ // Different build-side source table
+ verifyDifferentCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ "select straight_join a.id from functional.alltypes a,
functional.alltypes b " +
+ "where a.id = b.id", isDistributedPlan);
+ // Different build-side predicates
+ verifyDifferentCacheKeys(
+ String.format(basicJoinTmpl, "probe.id = build.id"),
+ String.format(basicJoinTmpl, "probe.id = build.id and build.id <
100"),
+ isDistributedPlan);
+ }
}
@Test
@@ -151,12 +157,18 @@ public class TupleCacheTest extends PlannerTestBase {
"v2 as (select c_nationkey, c_custkey, count(*) from v1, tpch.orders "
+
"where c_custkey = o_custkey group by c_nationkey, c_custkey) " +
"select c_nationkey, count(*) from v2 group by c_nationkey";
- verifyNIdenticalCacheKeys(rightJoinAgg, rightJoinAgg, 2);
+ verifyNIdenticalCacheKeys(rightJoinAgg, rightJoinAgg, 2,
+ /* isDistributedPlan */ true);
+ // For single node plan, there are no exchanges and everything is eligible.
+ verifyAllEligible(rightJoinAgg, /* isDistributedPlan */ false);
// Both scans are cached, but aggregates above hash join and exchange are
not.
String innerJoinAgg = "select count(*) from functional.alltypes t1 inner
join " +
"functional.alltypestiny t2 on t1.smallint_col = t2.smallint_col group
by " +
"t1.tinyint_col, t2.smallint_col having count(t2.int_col) =
count(t1.bigint_col)";
- verifyNIdenticalCacheKeys(innerJoinAgg, innerJoinAgg, 2);
+ verifyNIdenticalCacheKeys(innerJoinAgg, innerJoinAgg, 2,
+ /* isDistributedPlan */ true);
+ // For single node plan, there are no exchanges and everything is eligible.
+ verifyAllEligible(innerJoinAgg, /* isDistributedPlan */ false);
// Both scans are cached, but aggregate is not because it's above a union.
Limit only
// applies to aggregate above exchange, which is obviously not cached.
String unionAgg = "select count(*) from (select * from functional.alltypes
" +
@@ -200,8 +212,9 @@ public class TupleCacheTest extends PlannerTestBase {
// Verify that a limit makes a statement ineligible
verifyCacheIneligible("select id from functional.alltypes limit 10");
- // A limit higher in the plan doesn't impact cache eligibility for the
scan nodes.
- verifyIdenticalCacheKeys(
+ // A limit higher in the plan doesn't impact cache eligibility for the
scan nodes,
+ // but it does make the join node ineligible.
+ verifyOverlappingCacheKeys(
"select a.id from functional.alltypes a, functional.alltypes b",
"select a.id from functional.alltypes a, functional.alltypes b limit
10");
@@ -328,6 +341,66 @@ public class TupleCacheTest extends PlannerTestBase {
"select * from functional_parquet.iceberg_v2_delete_both_eq_and_pos");
}
+ @Test
+ public void testJoins() {
+ // A plan with a hash join is eligible
+ verifyAllEligible("select straight_join probe.id from functional.alltypes
probe, " +
+ "functional.alltypes build where probe.id = build.id and " +
+ "probe.int_col <= build.int_col", /* isDistributedPlan*/ false);
+
+ // A plan with a nested loop join is eligible
+ verifyAllEligible("select straight_join probe.id from functional.alltypes
probe, " +
+ "functional.alltypes build where probe.id < build.id",
+ /* isDistributedPlan*/ false);
+
+ // An iceberg plan with a delete join
+ verifyAllEligible(
+ "select * from
functional_parquet.iceberg_v2_positional_delete_all_rows",
+ /* isDistributedPlan*/ false);
+
+ // Different predicates on the join node makes the cache keys different,
but it
+ // doesn't impact the scan nodes.
+ verifyOverlappingCacheKeys(
+ "select straight_join probe.id from functional.alltypes probe, " +
+ "functional.alltypes build where probe.id = build.id and " +
+ "probe.int_col <= build.int_col",
+ "select straight_join probe.id from functional.alltypes probe, " +
+ "functional.alltypes build where probe.id = build.id and " +
+ "probe.int_col + 1 <= build.int_col");
+
+ // Different predicates on the nested loop join node makes the cache keys
different
+ verifyOverlappingCacheKeys(
+ "select straight_join probe.id from functional.alltypes probe, " +
+ "functional.alltypes build where probe.id < build.id",
+ "select straight_join probe.id from functional.alltypes probe, " +
+ "functional.alltypes build where probe.id + 1 < build.id");
+
+ // Using broadcast/shuffle hints to compare the same query with a broadcast
+ // join vs a partitioned join. They share the build-side scan location.
+ String simple_join_with_hint_template =
+ "select straight_join probe.id from functional.alltypes probe join " +
+ "%s functional.alltypes build on (probe.id = build.id)";
+ verifyOverlappingCacheKeys(
+ String.format(simple_join_with_hint_template, "/* +broadcast */"),
+ String.format(simple_join_with_hint_template, "/* +shuffle */"),
+ /* isDistributedPlan */ true);
+
+ // With a broadcast join, we can cache past the join.
+ verifyJoinNodesEligible(
+ String.format(simple_join_with_hint_template, "/* +broadcast */"), 1,
+ /* isDistributedPlan */ true);
+
+ // With a partitioned join, verify that we don't cache past the join.
+ verifyJoinNodesEligible(
+ String.format(simple_join_with_hint_template, "/* +shuffle */"), 0,
+ /* isDistributedPlan */ true);
+
+ // Verify that we can cache past a directed exchange
+ verifyJoinNodesEligible(
+ "select * from
functional_parquet.iceberg_v2_positional_delete_all_rows", 1,
+ /* isDistributedPlan */ true);
+ }
+
@Test
public void testDeterministicScheduling() {
// Verify that the HdfsScanNode that feeds into a TupleCacheNode uses
deterministic
@@ -347,8 +420,9 @@ public class TupleCacheTest extends PlannerTestBase {
}
}
- protected List<PlanNode> getCacheEligibleNodes(String query) {
- List<PlanFragment> plan = getPlan(query);
+ protected List<PlanNode> getCacheEligibleNodes(String query,
+ boolean isDistributedPlan) {
+ List<PlanFragment> plan = getPlan(query, isDistributedPlan);
PlanNode planRoot = plan.get(0).getPlanRoot();
// Walk over the plan and produce a list of cache keys.
List<PlanNode> preOrderPlanNodes = planRoot.getNodesPreOrder();
@@ -364,6 +438,11 @@ public class TupleCacheTest extends PlannerTestBase {
return cacheEligibleNodes;
}
+ // Simplified signature for single node plan
+ protected List<PlanNode> getCacheEligibleNodes(String query) {
+ return getCacheEligibleNodes(query, /* isDistributedPlan */ false);
+ }
+
private List<String> getCacheKeys(List<PlanNode> cacheEligibleNodes) {
List<String> cacheKeys = new ArrayList<String>();
for (PlanNode node : cacheEligibleNodes) {
@@ -380,46 +459,49 @@ public class TupleCacheTest extends PlannerTestBase {
return cacheHashTraces;
}
- private void printCacheEligibleNode(PlanNode node, StringBuilder log) {
- log.append(node.getDisplayLabel());
- log.append("\n");
- log.append("cache key: ");
- log.append(node.getTupleCacheInfo().getHashString());
- log.append("\n");
- log.append("cache key hash trace: ");
- log.append(node.getTupleCacheInfo().getHashTrace());
- log.append("\n");
- }
-
- private void printQueryCacheEligibleNodes(String query,
- List<PlanNode> cacheEligibleNodes, StringBuilder log) {
+ private void printQueryNodesCacheInfo(String query,
+ List<PlanNode> nodes, StringBuilder log) {
log.append("Query: ");
log.append(query);
log.append("\n");
- for (PlanNode node : cacheEligibleNodes) {
- printCacheEligibleNode(node, log);
+ for (PlanNode node : nodes) {
+ log.append(node.getDisplayLabel());
+ log.append("\n");
+ log.append(node.getTupleCacheInfo().toString());
}
}
- protected void verifyCacheIneligible(String query) {
- List<PlanNode> cacheEligibleNodes = getCacheEligibleNodes(query);
+ protected void verifyCacheIneligible(String query, boolean
isDistributedPlan) {
+ List<PlanNode> cacheEligibleNodes = getCacheEligibleNodes(query,
isDistributedPlan);
// No eligible locations
if (cacheEligibleNodes.size() != 0) {
StringBuilder errorLog = new StringBuilder();
errorLog.append("Expected no cache eligible nodes. Instead found:\n");
- printQueryCacheEligibleNodes(query, cacheEligibleNodes, errorLog);
+ printQueryNodesCacheInfo(query, cacheEligibleNodes, errorLog);
fail(errorLog.toString());
}
}
+ // Simplified signature for single node plan
+ protected void verifyCacheIneligible(String query) {
+ verifyCacheIneligible(query, /* isDistributedPlan */ false);
+ }
+
+ protected void verifyIdenticalCacheKeys(String query1, String query2,
+ boolean isDistributedPlan) {
+ verifyNIdenticalCacheKeys(query1, query2, 1, isDistributedPlan);
+ }
+
+ // Simplified signature for single node plan
protected void verifyIdenticalCacheKeys(String query1, String query2) {
- verifyNIdenticalCacheKeys(query1, query2, 1);
+ verifyIdenticalCacheKeys(query1, query2, /* isDistributedPlan */ false);
}
- protected void verifyNIdenticalCacheKeys(String query1, String query2, int
n) {
- List<PlanNode> cacheEligibleNodes1 = getCacheEligibleNodes(query1);
- List<PlanNode> cacheEligibleNodes2 = getCacheEligibleNodes(query2);
+ protected void verifyNIdenticalCacheKeys(String query1, String query2, int n,
+ boolean isDistributedPlan) {
+ List<PlanNode> cacheEligibleNodes1 = getCacheEligibleNodes(query1,
isDistributedPlan);
+ List<PlanNode> cacheEligibleNodes2 = getCacheEligibleNodes(query2,
isDistributedPlan);
assertTrue(cacheEligibleNodes1.size() >= n);
List<String> cacheKeys1 = getCacheKeys(cacheEligibleNodes1);
List<String> cacheKeys2 = getCacheKeys(cacheEligibleNodes2);
@@ -428,15 +510,21 @@ public class TupleCacheTest extends PlannerTestBase {
if (!cacheKeys1.equals(cacheKeys2) ||
!cacheHashTraces1.equals(cacheHashTraces2)) {
StringBuilder errorLog = new StringBuilder();
errorLog.append("Expected identical cache keys. Instead found:\n");
- printQueryCacheEligibleNodes(query1, cacheEligibleNodes1, errorLog);
- printQueryCacheEligibleNodes(query2, cacheEligibleNodes2, errorLog);
+ printQueryNodesCacheInfo(query1, cacheEligibleNodes1, errorLog);
+ printQueryNodesCacheInfo(query2, cacheEligibleNodes2, errorLog);
fail(errorLog.toString());
}
}
- protected void verifyOverlappingCacheKeys(String query1, String query2) {
- List<PlanNode> cacheEligibleNodes1 = getCacheEligibleNodes(query1);
- List<PlanNode> cacheEligibleNodes2 = getCacheEligibleNodes(query2);
+ // Simplified signature for single node plan
+ protected void verifyNIdenticalCacheKeys(String query1, String query2, int
n) {
+ verifyNIdenticalCacheKeys(query1, query2, n, /* isDistributedPlan */
false);
+ }
+
+ protected void verifyOverlappingCacheKeys(String query1, String query2,
+ boolean isDistributedPlan) {
+ List<PlanNode> cacheEligibleNodes1 = getCacheEligibleNodes(query1,
isDistributedPlan);
+ List<PlanNode> cacheEligibleNodes2 = getCacheEligibleNodes(query2,
isDistributedPlan);
// None of this makes any sense unless they both have eligible nodes
assertTrue(cacheEligibleNodes1.size() > 0);
@@ -459,23 +547,29 @@ public class TupleCacheTest extends PlannerTestBase {
if (keyIntersection.size() == 0 || hashTraceIntersection.size() == 0) {
StringBuilder errorLog = new StringBuilder();
errorLog.append("Expected overlapping cache keys. Instead found:\n");
- printQueryCacheEligibleNodes(query1, cacheEligibleNodes1, errorLog);
- printQueryCacheEligibleNodes(query2, cacheEligibleNodes2, errorLog);
+ printQueryNodesCacheInfo(query1, cacheEligibleNodes1, errorLog);
+ printQueryNodesCacheInfo(query2, cacheEligibleNodes2, errorLog);
fail(errorLog.toString());
}
if (cacheKeys1.equals(cacheKeys2) ||
cacheHashTraces1.equals(cacheHashTraces2)) {
StringBuilder errorLog = new StringBuilder();
errorLog.append("Expected some cache keys to differ. Instead found:\n");
- printQueryCacheEligibleNodes(query1, cacheEligibleNodes1, errorLog);
- printQueryCacheEligibleNodes(query2, cacheEligibleNodes2, errorLog);
+ printQueryNodesCacheInfo(query1, cacheEligibleNodes1, errorLog);
+ printQueryNodesCacheInfo(query2, cacheEligibleNodes2, errorLog);
fail(errorLog.toString());
}
}
- protected void verifyDifferentCacheKeys(String query1, String query2) {
- List<PlanNode> cacheEligibleNodes1 = getCacheEligibleNodes(query1);
- List<PlanNode> cacheEligibleNodes2 = getCacheEligibleNodes(query2);
+ // Simplified signature for single node plan
+ protected void verifyOverlappingCacheKeys(String query1, String query2) {
+ verifyOverlappingCacheKeys(query1, query2, /* isDistributedPlan */ false);
+ }
+
+ protected void verifyDifferentCacheKeys(String query1, String query2,
+ boolean isDistributedPlan) {
+ List<PlanNode> cacheEligibleNodes1 = getCacheEligibleNodes(query1,
isDistributedPlan);
+ List<PlanNode> cacheEligibleNodes2 = getCacheEligibleNodes(query2,
isDistributedPlan);
// None of this makes any sense unless they both have eligible nodes
assertTrue(cacheEligibleNodes1.size() > 0);
@@ -497,8 +591,63 @@ public class TupleCacheTest extends PlannerTestBase {
if (keyIntersection.size() != 0 || hashTraceIntersection.size() != 0) {
StringBuilder errorLog = new StringBuilder();
errorLog.append("Expected different cache keys. Instead found:\n");
- printQueryCacheEligibleNodes(query1, cacheEligibleNodes1, errorLog);
- printQueryCacheEligibleNodes(query2, cacheEligibleNodes2, errorLog);
+ printQueryNodesCacheInfo(query1, cacheEligibleNodes1, errorLog);
+ printQueryNodesCacheInfo(query2, cacheEligibleNodes2, errorLog);
+ fail(errorLog.toString());
+ }
+ }
+
+ // Simplified signature for single node plan
+ protected void verifyDifferentCacheKeys(String query1, String query2) {
+ verifyDifferentCacheKeys(query1, query2, /* isDistributedPlan */ false);
+ }
+
+ protected void verifyAllEligible(String query, boolean isDistributedPlan) {
+ List<PlanFragment> plan = getPlan(query, isDistributedPlan);
+ PlanNode planRoot = plan.get(0).getPlanRoot();
+ // Walk over the plan and produce a list of cache keys.
+ List<PlanNode> preOrderPlanNodes = planRoot.getNodesPreOrder();
+ List<PlanNode> ineligibleNodes = new ArrayList<PlanNode>();
+ for (PlanNode node : preOrderPlanNodes) {
+ if (node instanceof TupleCacheNode) continue;
+ TupleCacheInfo info = node.getTupleCacheInfo();
+ if (!info.isEligible()) {
+ ineligibleNodes.add(node);
+ }
+ }
+ if (ineligibleNodes.size() != 0) {
+ StringBuilder errorLog = new StringBuilder();
+ errorLog.append("Expected all nodes eligible, instead found ineligible
nodes:\n");
+ printQueryNodesCacheInfo(query, ineligibleNodes, errorLog);
+ fail(errorLog.toString());
+ }
+ }
+
+ protected void verifyJoinNodesEligible(String query, int
expectedEligibleJoins,
+ boolean isDistributedPlan) {
+ List<PlanFragment> plan = getPlan(query, isDistributedPlan);
+ PlanNode planRoot = plan.get(0).getPlanRoot();
+ // Walk over the plan and produce a list of cache keys.
+ List<PlanNode> preOrderPlanNodes = planRoot.getNodesPreOrder();
+ List<PlanNode> ineligibleJoinNodes = new ArrayList<PlanNode>();
+ int eligibleJoins = 0;
+ for (PlanNode node : preOrderPlanNodes) {
+ if (node instanceof JoinNode) {
+ TupleCacheInfo info = node.getTupleCacheInfo();
+ if (!info.isEligible()) {
+ ineligibleJoinNodes.add(node);
+ } else {
+ eligibleJoins++;
+ }
+ }
+ }
+ if (eligibleJoins != expectedEligibleJoins) {
+ StringBuilder errorLog = new StringBuilder();
+ errorLog.append(
+ String.format("Expected %d join nodes eligible, instead found %d.\n",
+ eligibleJoins, expectedEligibleJoins));
+ errorLog.append("Ineligible join nodes:");
+ printQueryNodesCacheInfo(query, ineligibleJoinNodes, errorLog);
fail(errorLog.toString());
}
}
@@ -512,7 +661,7 @@ public class TupleCacheTest extends PlannerTestBase {
* @param query the query to run
* @return the first (or only) fragment plan node
*/
- private List<PlanFragment> getPlan(String query) {
+ private List<PlanFragment> getPlan(String query, boolean isDistributedPlan) {
// Create a query context with rewrites disabled
// TODO: Should probably turn them on, or run a test
// both with and without rewrites.
@@ -523,7 +672,7 @@ public class TupleCacheTest extends PlannerTestBase {
// Force the plan to run on a single node so it
// resides in a single fragment.
TQueryOptions queryOptions = queryCtx.client_request.getQuery_options();
- queryOptions.setNum_nodes(1);
+ queryOptions.setNum_nodes(isDistributedPlan ? 0 : 1);
// Turn on tuple caching
queryOptions.setEnable_tuple_cache(true);
@@ -538,4 +687,11 @@ public class TupleCacheTest extends PlannerTestBase {
}
return planCtx.getPlan();
}
+
+ /**
+ * Simplified signature of getPlan() to get the single node plan
+ */
+ private List<PlanFragment> getPlan(String query) {
+ return getPlan(query, false);
+ }
}
diff --git a/tests/custom_cluster/test_tuple_cache.py
b/tests/custom_cluster/test_tuple_cache.py
index 82db750cf..f0419a696 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -18,10 +18,10 @@
from __future__ import absolute_import, division, print_function
import os
-import pytest
import random
import re
import string
+import time
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfDockerizedCluster, SkipIf
@@ -63,6 +63,7 @@ def assertCounterOrder(profile, key, vals):
values = getCounterValues(profile, key)
assert values == vals, values
+
def assertCounter(profile, key, val, num_matches):
if not isinstance(num_matches, list):
num_matches = [num_matches]
@@ -525,7 +526,7 @@ class TestTupleCacheCluster(TestTupleCacheBase):
# Prime the cache
base_result = self.execute_query(query)
base_cache_keys = get_cache_keys(base_result.runtime_profile)
- assert len(base_cache_keys) == 2
+ assert len(base_cache_keys) == 3
# Drop and reload the table
self.execute_query("DROP TABLE {0}".format(fq_table))
@@ -540,6 +541,96 @@ class TestTupleCacheCluster(TestTupleCacheBase):
assert base_cache_keys == reload_cache_keys
# Skips verifying cache hits as fragments may not be assigned to the same
nodes.
+ def test_join_modifications(self, vector, unique_database):
+ """
+ This tests caching above a join without runtime filters and verifies that
changes
+ to the build side table results in a different cache key.
+ """
+ fq_table = "{0}.join_modifications".format(unique_database)
+ query = "select straight_join probe.id from functional.alltypes probe join
" \
+ "/* +broadcast */ {0} build on (probe.id = build.age) ".format(fq_table)
+ \
+ "order by probe.id"
+ # Create an empty table
+ self.create_table(fq_table, scale=0)
+ probe_id = 6
+ build_id = 7
+ above_join_id = 8
+
+ # Run without runtime filters to verify the regular path works
+ no_runtime_filters = dict(vector.get_value('exec_option'))
+ no_runtime_filters['runtime_filter_mode'] = 'off'
+
+ # Establish a baseline
+ empty_result = self.execute_query(query, no_runtime_filters)
+ empty_cache_keys = get_cache_keys(empty_result.runtime_profile)
+ # The build side is on one node. The probe side is on three nodes.
+ assert len(empty_cache_keys) == 3
+ assert len(empty_cache_keys[probe_id]) == 3
+ assert len(empty_cache_keys[build_id]) == 1
+ assert len(empty_cache_keys[above_join_id]) == 3
+ empty_build_key = empty_cache_keys[build_id][0]
+ empty_build_compile_key, empty_build_finst_key = empty_build_key.split("_")
+ assert empty_build_finst_key == "0"
+ assert len(empty_result.data) == 0
+ empty_join_compile_key = empty_cache_keys[above_join_id][0].split("_")[0]
+
+ # Insert a row, which creates a file / scan range
+ self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table,
table_value(0)))
+
+ # There is a build-side scan range, so the fragment instance key should be
non-zero.
+ one_file_result = self.execute_query(query, no_runtime_filters)
+ assert len(one_file_result.data) == 1
+ one_cache_keys = get_cache_keys(one_file_result.runtime_profile)
+ assert len(one_cache_keys) == 3
+ assert len(one_cache_keys[probe_id]) == 3
+ assert len(one_cache_keys[build_id]) == 1
+ assert len(one_cache_keys[above_join_id]) == 3
+ one_build_key = one_cache_keys[build_id][0]
+ one_build_compile_key, one_build_finst_key = one_build_key.split("_")
+ assert one_build_finst_key != "0"
+ assert one_build_compile_key == empty_build_compile_key
+ # This should be a cache miss for the build side and above the join, but a
cache
+ # hit for the probe side (3 instances).
+ assertCounter(one_file_result.runtime_profile, NUM_HITS, 1, 3)
+ assertCounter(one_file_result.runtime_profile, NUM_HALTED, 0, 7)
+ assertCounter(one_file_result.runtime_profile, NUM_SKIPPED, 0, 7)
+ # The above join compile time key should have changed, because it
incorporates the
+ # build side scan ranges.
+ one_join_compile_key = one_cache_keys[above_join_id][0].split("_")[0]
+ assert one_join_compile_key != empty_join_compile_key
+
+ def test_join_timing(self, vector):
+ """
+ This verifies that a very short query with a cache hit above a join can
complete
+ below a certain threshold. This should be sensitive to issues with
synchronization
+ with the shared join builder.
+ """
+ query = "select straight_join probe.id from functional.alltypes probe join
" \
+ "/* +broadcast */ functional.alltypes build on (probe.id = build.id) " \
+ "order by probe.id"
+
+ # To avoid interaction with cache entries from previous tests, set an
unrelated
+ # query option to keep the key different.
+ custom_options = dict(vector.get_value('exec_option'))
+ custom_options['batch_size'] = '1234'
+
+ first_run_result = self.execute_query(query, custom_options)
+ assert len(first_run_result.data) == 7300
+ assertCounter(first_run_result.runtime_profile, NUM_HITS, 0, 9)
+ assertCounter(first_run_result.runtime_profile, NUM_HALTED, 0, 9)
+ assertCounter(first_run_result.runtime_profile, NUM_SKIPPED, 0, 9)
+ start_time = time.time()
+ second_run_result = self.execute_query(query, custom_options)
+ end_time = time.time()
+ # The location above the join hits and the location on the build side hits,
+ # but the probe location is below the join and doesn't hit.
+ assertCounter(second_run_result.runtime_profile, NUM_HITS, 1, 6)
+ assertCounter(second_run_result.runtime_profile, NUM_HALTED, 0, 9)
+ assertCounter(second_run_result.runtime_profile, NUM_SKIPPED, 0, 9)
+ # As a sanity check for the synchronization pieces, verify that this runs
in less
+ # than 750 milliseconds.
+ assert end_time - start_time < 0.75
+
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1)
class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase):