This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 14597c7e2 IMPALA-13964: Fix test_tuple_cache_tpc_queries.py flakiness
14597c7e2 is described below
commit 14597c7e2fd44ca21bcdfcdc6a73b5a7c4aa6241
Author: Joe McDonnell <[email protected]>
AuthorDate: Wed May 21 18:34:23 2025 -0700
IMPALA-13964: Fix test_tuple_cache_tpc_queries.py flakiness
This makes two changes to deflake test_tuple_cache_tpc_queries.py.
First, it increases the runtime filter wait time from 60 seconds to
600 seconds. The correctness verification slows down the path
that produces the runtime filter. The slowdown is dependent on
the speed of storage, so this can get very slow on test machines.
Second, this skips correctness checking for locations that are just
after streaming aggregations. Streaming aggregations can produce
variable output that the correctness checking can't handle.
For example a grouping aggregation computing a sum might have
a preaggregation produce either (A: 3) or (A: 2), (A: 1) or
(A: 1), (A: 1), (A: 1). The finalization sees these as equivalent.
This marks the nodes as variable starting with the preaggregation
and clears the mark at the finalize stage.
When skipping correctness checking, the tuple cache node does not
hit the cache normally. This guarantees that its children will run
and go through correctness checking.
Testing:
- Ran test_tuple_cache_tpc_queries.py locally
- Added a frontend test for this specific case
Change-Id: If5e1be287bdb489a89aea3b2d7bec416220feb9a
Reviewed-on: http://gerrit.cloudera.org:8080/23010
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Michael Smith <[email protected]>
---
be/src/exec/tuple-cache-node.cc | 59 ++++++++++++----------
be/src/exec/tuple-cache-node.h | 6 +++
common/thrift/PlanNodes.thrift | 3 ++
.../impala/common/ThriftSerializationCtx.java | 19 +++++++
.../org/apache/impala/planner/AggregationNode.java | 10 ++++
.../org/apache/impala/planner/TupleCacheInfo.java | 25 +++++++++
.../org/apache/impala/planner/TupleCacheNode.java | 15 +++++-
.../apache/impala/planner/TupleCachePlanner.java | 7 ++-
.../org/apache/impala/planner/TupleCacheTest.java | 19 +++++++
tests/query_test/test_tuple_cache_tpc_queries.py | 2 +-
10 files changed, 136 insertions(+), 29 deletions(-)
diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc
index 165f89f7a..57bc17f8e 100644
--- a/be/src/exec/tuple-cache-node.cc
+++ b/be/src/exec/tuple-cache-node.cc
@@ -70,6 +70,8 @@ Status TupleCacheNode::Prepare(RuntimeState* state) {
ComputeFragmentInstanceKey(state);
combined_key_ = plan_node().tnode_->tuple_cache_node.compile_time_key + "_" +
std::to_string(fragment_instance_key_);
+ skip_correctness_verification_ =
+ plan_node().tnode_->tuple_cache_node.skip_correctness_verification;
runtime_profile()->AddInfoString("Combined Key", combined_key_);
return Status::OK();
@@ -90,33 +92,38 @@ Status TupleCacheNode::Open(RuntimeState* state) {
handle_ = tuple_cache_mgr->Lookup(combined_key_, true);
if (tuple_cache_mgr->IsAvailableForRead(handle_)) {
if (tuple_cache_mgr->DebugDumpEnabled() &&
TupleCacheVerificationEnabled(state)) {
- // We need the original fragment id to construct the path for the
reference debug
- // cache file. If it's missing from the metadata, we return an error
status
- // immediately.
- string org_fragment_id =
tuple_cache_mgr->GetFragmentIdForTupleCache(combined_key_);
- if (org_fragment_id.empty()) {
- return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
- Substitute("Metadata of tuple cache '$0' is missing for
correctness check",
- combined_key_));
+ // If the node is marked to skip correctness verification, we don't want
to read
+ // from the cache as that would prevent its children from executing.
+ if (!skip_correctness_verification_) {
+ // We need the original fragment id to construct the path for the
reference debug
+ // cache file. If it's missing from the metadata, we return an error
status
+ // immediately.
+ string org_fragment_id =
+ tuple_cache_mgr->GetFragmentIdForTupleCache(combined_key_);
+ if (org_fragment_id.empty()) {
+ return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
+ Substitute("Metadata of tuple cache '$0' is missing for
correctness check",
+ combined_key_));
+ }
+ string ref_sub_dir;
+ string sub_dir;
+ string ref_file_path = GetDebugDumpPath(state, org_fragment_id,
&ref_sub_dir);
+ string file_path = GetDebugDumpPath(state, string(), &sub_dir);
+ DCHECK_EQ(ref_sub_dir, sub_dir);
+ DCHECK(!ref_sub_dir.empty());
+ DCHECK(!ref_file_path.empty());
+ DCHECK(!file_path.empty());
+ // Create the subdirectory for the debug caches if needed.
+ RETURN_IF_ERROR(tuple_cache_mgr->CreateDebugDumpSubdir(ref_sub_dir));
+ // Open the writer for writing the tuple data from the cache entries
to be
+ // the reference cache data.
+ debug_dump_text_writer_ref_ =
make_unique<TupleTextFileWriter>(ref_file_path);
+ RETURN_IF_ERROR(debug_dump_text_writer_ref_->Open());
+ // Open the writer for writing the tuple data from children in
GetNext() to
+ // compare with the reference debug cache file.
+ debug_dump_text_writer_ = make_unique<TupleTextFileWriter>(file_path);
+ RETURN_IF_ERROR(debug_dump_text_writer_->Open());
}
- string ref_sub_dir;
- string sub_dir;
- string ref_file_path = GetDebugDumpPath(state, org_fragment_id,
&ref_sub_dir);
- string file_path = GetDebugDumpPath(state, string(), &sub_dir);
- DCHECK_EQ(ref_sub_dir, sub_dir);
- DCHECK(!ref_sub_dir.empty());
- DCHECK(!ref_file_path.empty());
- DCHECK(!file_path.empty());
- // Create the subdirectory for the debug caches if needed.
- RETURN_IF_ERROR(tuple_cache_mgr->CreateDebugDumpSubdir(ref_sub_dir));
- // Open the writer for writing the tuple data from the cache entries to
be
- // the reference cache data.
- debug_dump_text_writer_ref_ =
make_unique<TupleTextFileWriter>(ref_file_path);
- RETURN_IF_ERROR(debug_dump_text_writer_ref_->Open());
- // Open the writer for writing the tuple data from children in GetNext()
to
- // compare with the reference debug cache file.
- debug_dump_text_writer_ = make_unique<TupleTextFileWriter>(file_path);
- RETURN_IF_ERROR(debug_dump_text_writer_->Open());
} else {
reader_ = make_unique<TupleFileReader>(
tuple_cache_mgr->GetPath(handle_), mem_tracker(), runtime_profile());
diff --git a/be/src/exec/tuple-cache-node.h b/be/src/exec/tuple-cache-node.h
index 6adeaf58a..ef2e2dd6b 100644
--- a/be/src/exec/tuple-cache-node.h
+++ b/be/src/exec/tuple-cache-node.h
@@ -62,6 +62,12 @@ private:
// This combination is unique for a given fragment instance.
std::string combined_key_;
+ // This caching location should skip correctness verification. This can be
true when a
+ // location has variability in its results that is tolerated by nodes higher
in the
+ // plan (e.g. streaming aggregations can produce variable results that do
not change
+ // the result out of the finalization phase).
+ bool skip_correctness_verification_;
+
/// Number of results that were found in the tuple cache
RuntimeProfile::Counter* num_hits_counter_ = nullptr;
/// Number of results that were too large for the cache
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 929dd0008..cd69160c3 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -725,6 +725,9 @@ struct TTupleCacheNode {
// into this node. The TupleCacheNode will hash the scan ranges for its
fragment
// at runtime.
2: required list<i32> input_scan_node_ids;
+ // Skip correctness verification at this node. This can be true if the
result at this
+ // location is variable in a way that does not impact correctness.
+ 3: required bool skip_correctness_verification;
}
enum TMergeCaseType {
diff --git
a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
index f980b363e..b8fd16b02 100644
--- a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
+++ b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
@@ -140,4 +140,23 @@ public class ThriftSerializationCtx {
tupleCacheInfo_.incorporateScans();
}
}
+
+ /**
+ * Mark this location as variable due a streaming aggregation.
+ */
+ public void setStreamingAggVariability() {
+ if (isTupleCache()) {
+ tupleCacheInfo_.setStreamingAggVariability();
+ }
+ }
+
+ /**
+ * Clear any mark indicating variability due to a streaming aggregation.
+ * It is safe to call this even if there is no existing variability.
+ */
+ public void clearStreamingAggVariability() {
+ if (isTupleCache()) {
+ tupleCacheInfo_.clearStreamingAggVariability();
+ }
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index ac5f8e5f4..f963ddfe7 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -821,6 +821,16 @@ public class AggregationNode extends PlanNode implements
SpillableOperator {
}
msg.agg_node.addToAggregators(taggregator);
}
+ // Streaming aggregations can have variable output that is handled and
undone
+ // by the finalize stage. For example, a grouping aggregation doing a sum
could
+ // return (a, 3) or (a, 2), (a, 1) or (a, 1), (a, 1), (a, 1). Mark the
node as
+ // variable, which disables automated correctness checking. Clear the mark
at the
+ // finalize stage, as the finalize stage undoes the variability.
+ if (useStreamingPreagg_) {
+ serialCtx.setStreamingAggVariability();
+ } else if (needsFinalize_) {
+ serialCtx.clearStreamingAggVariability();
+ }
}
@Override
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
index 4ff52e61c..f1220b40d 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
@@ -130,6 +130,13 @@ public class TupleCacheInfo {
// to hash the scan ranges of input scan nodes to generate the key.
private final List<HdfsScanNode> inputScanNodes_ = new
ArrayList<HdfsScanNode>();
+ // This node has variable output due to a streaming aggregation. For
example, a
+ // grouping aggregation doing a sum could return (a, 3) or (a, 2), (a, 1) or
+ // (a, 1), (a, 1), (a, 1). These all mean the same thing to the finalize
stage.
+ // We need to know about it to disable automated correctness checking for
locations
+ // with this variability.
+ private boolean streamingAggVariability_ = false;
+
// These fields accumulate partial results until finalizeHash() is called.
private Hasher hasher_ = Hashing.murmur3_128().newHasher();
@@ -157,6 +164,19 @@ public class TupleCacheInfo {
return ineligibilityReasons_.isEmpty();
}
+ public void setStreamingAggVariability() {
+ Preconditions.checkState(!streamingAggVariability_);
+ streamingAggVariability_ = true;
+ }
+
+ public void clearStreamingAggVariability() {
+ streamingAggVariability_ = false;
+ }
+
+ public boolean getStreamingAggVariability() {
+ return streamingAggVariability_;
+ }
+
public String getHashString() {
Preconditions.checkState(isEligible(),
"TupleCacheInfo only has a hash if it is cache eligible");
@@ -289,6 +309,11 @@ public class TupleCacheInfo {
// id translation maps.
registerTupleHelper(id, false);
}
+
+ // The variability transmits up the tree until the aggregation is
finalized.
+ if (child.streamingAggVariability_) {
+ streamingAggVariability_ = true;
+ }
return true;
}
}
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
index d93145668..9b76f4673 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
@@ -43,9 +43,12 @@ public class TupleCacheNode extends PlanNode {
protected String compileTimeKey_;
protected String hashTrace_;
+ protected boolean displayCorrectnessCheckingInfo_;
+ protected boolean skipCorrectnessVerification_;
protected final List<Integer> inputScanNodeIds_ = new ArrayList<Integer>();
- public TupleCacheNode(PlanNodeId id, PlanNode child) {
+ public TupleCacheNode(PlanNodeId id, PlanNode child,
+ boolean displayCorrectnessCheckingInfo) {
super(id, "TUPLE CACHE");
addChild(child);
cardinality_ = child.getCardinality();
@@ -55,6 +58,10 @@ public class TupleCacheNode extends PlanNode {
Preconditions.checkState(childCacheInfo.isEligible());
compileTimeKey_ = childCacheInfo.getHashString();
hashTrace_ = childCacheInfo.getHashTrace();
+ // If there is variability due to a streaming agg, skip the correctness
verification
+ // for this location.
+ skipCorrectnessVerification_ = childCacheInfo.getStreamingAggVariability();
+ displayCorrectnessCheckingInfo_ = displayCorrectnessCheckingInfo;
for (HdfsScanNode scanNode : childCacheInfo.getInputScanNodes()) {
// Inputs into the tuple cache need to use deterministic scan range
assignment
scanNode.setDeterministicScanRangeAssignment(true);
@@ -87,6 +94,7 @@ public class TupleCacheNode extends PlanNode {
TTupleCacheNode tupleCacheNode = new TTupleCacheNode();
tupleCacheNode.setCompile_time_key(compileTimeKey_);
tupleCacheNode.setInput_scan_node_ids(inputScanNodeIds_);
+
tupleCacheNode.setSkip_correctness_verification(skipCorrectnessVerification_);
msg.setTuple_cache_node(tupleCacheNode);
}
@@ -113,6 +121,11 @@ public class TupleCacheNode extends PlanNode {
StringBuilder output = new StringBuilder();
output.append(String.format("%s%s:%s\n", prefix, id_.toString(),
displayName_));
output.append(detailPrefix + "cache key: " + compileTimeKey_ + "\n");
+ // Only display information about correctness verification if it is enabled
+ if (displayCorrectnessCheckingInfo_) {
+ output.append(detailPrefix + "skip correctness verification: " +
+ skipCorrectnessVerification_ + "\n");
+ }
// For debuggability, always print the hash trace until the cache key
calculation
// matures. Print trace in chunks to avoid excessive wrapping and padding
in
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
b/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
index 17b26fa81..2e6d64743 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
@@ -20,6 +20,7 @@ package org.apache.impala.planner;
import java.util.List;
import org.apache.impala.common.ImpalaException;
+import org.apache.impala.thrift.TQueryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,8 +94,12 @@ public class TupleCachePlanner {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding TupleCacheNode above node " + node.getId().toString());
}
+ // Get current query options
+ TQueryOptions queryOptions =
+ ctx_.getRootAnalyzer().getQueryCtx().client_request.getQuery_options();
// Allocate TupleCacheNode
- TupleCacheNode tupleCacheNode = new TupleCacheNode(ctx_.getNextNodeId(),
node);
+ TupleCacheNode tupleCacheNode = new TupleCacheNode(ctx_.getNextNodeId(),
node,
+ queryOptions.isEnable_tuple_cache_verification());
tupleCacheNode.init(ctx_.getRootAnalyzer());
PlanFragment curFragment = node.getFragment();
if (node == curFragment.getPlanRoot()) {
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
index 7cbf01b59..3abffe2b9 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -420,6 +420,25 @@ public class TupleCacheTest extends PlannerTestBase {
}
}
+ @Test
+ public void testSkipCorrectnessChecking() {
+ // Locations after a streaming aggregation before the finalize can have
variability
+ // that correctness checking can't handle.
+ List<PlanNode> cacheEligibleNodes =
+ getCacheEligibleNodes(
+ "select int_col, count(*) from functional.alltypes group by int_col",
+ /* isDistributedPlan */ true);
+ // In this plan, there is a streaming aggregation that feeds into a
partitioned
+ // exchange. The finalize phase is past that partitioned exchange. That
means that
+ // the only eligible node marked with streaming agg variability is the
initial
+ // streaming AggregationNode.
+ for (PlanNode node : cacheEligibleNodes) {
+ if (node instanceof AggregationNode) {
+ assertTrue(node.getTupleCacheInfo().getStreamingAggVariability());
+ }
+ }
+ }
+
protected List<PlanNode> getCacheEligibleNodes(String query,
boolean isDistributedPlan) {
List<PlanFragment> plan = getPlan(query, isDistributedPlan);
diff --git a/tests/query_test/test_tuple_cache_tpc_queries.py
b/tests/query_test/test_tuple_cache_tpc_queries.py
index c1f5a9a57..da264ac83 100644
--- a/tests/query_test/test_tuple_cache_tpc_queries.py
+++ b/tests/query_test/test_tuple_cache_tpc_queries.py
@@ -33,7 +33,7 @@ def run_tuple_cache_test(self, vector, query, mtdop):
# Use a long runtime filter wait time (1 minute) to ensure filters arrive
before
# generating the tuple cache for correctness check.
if IS_TUPLE_CACHE_CORRECT_CHECK:
- vector.get_value('exec_option')['runtime_filter_wait_time_ms'] = 60000
+ vector.get_value('exec_option')['runtime_filter_wait_time_ms'] = 600000
vector.get_value('exec_option')['enable_tuple_cache_verification'] = True
vector.get_value('exec_option')['mt_dop'] = mtdop
# Run twice to test write and read the tuple cache.