This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 14597c7e2 IMPALA-13964: Fix test_tuple_cache_tpc_queries.py flakiness
14597c7e2 is described below

commit 14597c7e2fd44ca21bcdfcdc6a73b5a7c4aa6241
Author: Joe McDonnell <[email protected]>
AuthorDate: Wed May 21 18:34:23 2025 -0700

    IMPALA-13964: Fix test_tuple_cache_tpc_queries.py flakiness
    
    This makes two changes to deflake test_tuple_cache_tpc_queries.py.
    First, it increases the runtime filter wait time from 60 seconds to
    600 seconds. The correctness verification slows down the path
    that produces the runtime filter. The slowdown is dependent on
    the speed of storage, so this can get very slow on test machines.
    
    Second, this skips correctness checking for locations that are just
    after streaming aggregations. Streaming aggregations can produce
    variable output that the correctness checking can't handle.
    For example a grouping aggregation computing a sum might have
    a preaggregation produce either (A: 3) or (A: 2), (A: 1) or
    (A: 1), (A: 1), (A: 1). The finalization sees these as equivalent.
    This marks the nodes as variable starting with the preaggregation
    and clears the mark at the finalize stage.
    
    When skipping correctness checking, the tuple cache node does not
    hit the cache normally. This guarantees that its children will run
    and go through correctness checking.
    
    Testing:
     - Ran test_tuple_cache_tpc_queries.py locally
     - Added a frontend test for this specific case
    
    Change-Id: If5e1be287bdb489a89aea3b2d7bec416220feb9a
    Reviewed-on: http://gerrit.cloudera.org:8080/23010
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Michael Smith <[email protected]>
---
 be/src/exec/tuple-cache-node.cc                    | 59 ++++++++++++----------
 be/src/exec/tuple-cache-node.h                     |  6 +++
 common/thrift/PlanNodes.thrift                     |  3 ++
 .../impala/common/ThriftSerializationCtx.java      | 19 +++++++
 .../org/apache/impala/planner/AggregationNode.java | 10 ++++
 .../org/apache/impala/planner/TupleCacheInfo.java  | 25 +++++++++
 .../org/apache/impala/planner/TupleCacheNode.java  | 15 +++++-
 .../apache/impala/planner/TupleCachePlanner.java   |  7 ++-
 .../org/apache/impala/planner/TupleCacheTest.java  | 19 +++++++
 tests/query_test/test_tuple_cache_tpc_queries.py   |  2 +-
 10 files changed, 136 insertions(+), 29 deletions(-)

diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc
index 165f89f7a..57bc17f8e 100644
--- a/be/src/exec/tuple-cache-node.cc
+++ b/be/src/exec/tuple-cache-node.cc
@@ -70,6 +70,8 @@ Status TupleCacheNode::Prepare(RuntimeState* state) {
   ComputeFragmentInstanceKey(state);
   combined_key_ = plan_node().tnode_->tuple_cache_node.compile_time_key + "_" +
       std::to_string(fragment_instance_key_);
+  skip_correctness_verification_ =
+      plan_node().tnode_->tuple_cache_node.skip_correctness_verification;
   runtime_profile()->AddInfoString("Combined Key", combined_key_);
 
   return Status::OK();
@@ -90,33 +92,38 @@ Status TupleCacheNode::Open(RuntimeState* state) {
   handle_ = tuple_cache_mgr->Lookup(combined_key_, true);
   if (tuple_cache_mgr->IsAvailableForRead(handle_)) {
     if (tuple_cache_mgr->DebugDumpEnabled() && 
TupleCacheVerificationEnabled(state)) {
-      // We need the original fragment id to construct the path for the 
reference debug
-      // cache file. If it's missing from the metadata, we return an error 
status
-      // immediately.
-      string org_fragment_id = 
tuple_cache_mgr->GetFragmentIdForTupleCache(combined_key_);
-      if (org_fragment_id.empty()) {
-        return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
-            Substitute("Metadata of tuple cache '$0' is missing for 
correctness check",
-                combined_key_));
+      // If the node is marked to skip correctness verification, we don't want 
to read
+      // from the cache as that would prevent its children from executing.
+      if (!skip_correctness_verification_) {
+        // We need the original fragment id to construct the path for the 
reference debug
+        // cache file. If it's missing from the metadata, we return an error 
status
+        // immediately.
+        string org_fragment_id =
+            tuple_cache_mgr->GetFragmentIdForTupleCache(combined_key_);
+        if (org_fragment_id.empty()) {
+          return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
+              Substitute("Metadata of tuple cache '$0' is missing for 
correctness check",
+                  combined_key_));
+        }
+        string ref_sub_dir;
+        string sub_dir;
+        string ref_file_path = GetDebugDumpPath(state, org_fragment_id, 
&ref_sub_dir);
+        string file_path = GetDebugDumpPath(state, string(), &sub_dir);
+        DCHECK_EQ(ref_sub_dir, sub_dir);
+        DCHECK(!ref_sub_dir.empty());
+        DCHECK(!ref_file_path.empty());
+        DCHECK(!file_path.empty());
+        // Create the subdirectory for the debug caches if needed.
+        RETURN_IF_ERROR(tuple_cache_mgr->CreateDebugDumpSubdir(ref_sub_dir));
+        // Open the writer for writing the tuple data from the cache entries 
to be
+        // the reference cache data.
+        debug_dump_text_writer_ref_ = 
make_unique<TupleTextFileWriter>(ref_file_path);
+        RETURN_IF_ERROR(debug_dump_text_writer_ref_->Open());
+        // Open the writer for writing the tuple data from children in 
GetNext() to
+        // compare with the reference debug cache file.
+        debug_dump_text_writer_ = make_unique<TupleTextFileWriter>(file_path);
+        RETURN_IF_ERROR(debug_dump_text_writer_->Open());
       }
-      string ref_sub_dir;
-      string sub_dir;
-      string ref_file_path = GetDebugDumpPath(state, org_fragment_id, 
&ref_sub_dir);
-      string file_path = GetDebugDumpPath(state, string(), &sub_dir);
-      DCHECK_EQ(ref_sub_dir, sub_dir);
-      DCHECK(!ref_sub_dir.empty());
-      DCHECK(!ref_file_path.empty());
-      DCHECK(!file_path.empty());
-      // Create the subdirectory for the debug caches if needed.
-      RETURN_IF_ERROR(tuple_cache_mgr->CreateDebugDumpSubdir(ref_sub_dir));
-      // Open the writer for writing the tuple data from the cache entries to 
be
-      // the reference cache data.
-      debug_dump_text_writer_ref_ = 
make_unique<TupleTextFileWriter>(ref_file_path);
-      RETURN_IF_ERROR(debug_dump_text_writer_ref_->Open());
-      // Open the writer for writing the tuple data from children in GetNext() 
to
-      // compare with the reference debug cache file.
-      debug_dump_text_writer_ = make_unique<TupleTextFileWriter>(file_path);
-      RETURN_IF_ERROR(debug_dump_text_writer_->Open());
     } else {
       reader_ = make_unique<TupleFileReader>(
           tuple_cache_mgr->GetPath(handle_), mem_tracker(), runtime_profile());
diff --git a/be/src/exec/tuple-cache-node.h b/be/src/exec/tuple-cache-node.h
index 6adeaf58a..ef2e2dd6b 100644
--- a/be/src/exec/tuple-cache-node.h
+++ b/be/src/exec/tuple-cache-node.h
@@ -62,6 +62,12 @@ private:
   // This combination is unique for a given fragment instance.
   std::string combined_key_;
 
+  // This caching location should skip correctness verification. This can be 
true when a
+  // location has variability in its results that is tolerated by nodes higher 
in the
+  // plan (e.g. streaming aggregations can produce variable results that do 
not change
+  // the result out of the finalization phase).
+  bool skip_correctness_verification_;
+
   /// Number of results that were found in the tuple cache
   RuntimeProfile::Counter* num_hits_counter_ = nullptr;
   /// Number of results that were too large for the cache
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 929dd0008..cd69160c3 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -725,6 +725,9 @@ struct TTupleCacheNode {
   // into this node. The TupleCacheNode will hash the scan ranges for its 
fragment
   // at runtime.
   2: required list<i32> input_scan_node_ids;
+  // Skip correctness verification at this node. This can be true if the 
result at this
+  // location is variable in a way that does not impact correctness.
+  3: required bool skip_correctness_verification;
 }
 
 enum TMergeCaseType {
diff --git 
a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java 
b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
index f980b363e..b8fd16b02 100644
--- a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
+++ b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
@@ -140,4 +140,23 @@ public class ThriftSerializationCtx {
       tupleCacheInfo_.incorporateScans();
     }
   }
+
+  /**
+   * Mark this location as variable due a streaming aggregation.
+   */
+  public void setStreamingAggVariability() {
+    if (isTupleCache()) {
+      tupleCacheInfo_.setStreamingAggVariability();
+    }
+  }
+
+  /**
+   * Clear any mark indicating variability due to a streaming aggregation.
+   * It is safe to call this even if there is no existing variability.
+   */
+  public void clearStreamingAggVariability() {
+    if (isTupleCache()) {
+      tupleCacheInfo_.clearStreamingAggVariability();
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java 
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index ac5f8e5f4..f963ddfe7 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -821,6 +821,16 @@ public class AggregationNode extends PlanNode implements 
SpillableOperator {
       }
       msg.agg_node.addToAggregators(taggregator);
     }
+    // Streaming aggregations can have variable output that is handled and 
undone
+    // by the finalize stage. For example, a grouping aggregation doing a sum 
could
+    // return (a, 3) or (a, 2), (a, 1) or (a, 1), (a, 1), (a, 1). Mark the 
node as
+    // variable, which disables automated correctness checking. Clear the mark 
at the
+    // finalize stage, as the finalize stage undoes the variability.
+    if (useStreamingPreagg_) {
+      serialCtx.setStreamingAggVariability();
+    } else if (needsFinalize_) {
+      serialCtx.clearStreamingAggVariability();
+    }
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
index 4ff52e61c..f1220b40d 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
@@ -130,6 +130,13 @@ public class TupleCacheInfo {
   //    to hash the scan ranges of input scan nodes to generate the key.
   private final List<HdfsScanNode> inputScanNodes_ = new 
ArrayList<HdfsScanNode>();
 
+  // This node has variable output due to a streaming aggregation. For 
example, a
+  // grouping aggregation doing a sum could return (a, 3) or (a, 2), (a, 1) or
+  // (a, 1), (a, 1), (a, 1). These all mean the same thing to the finalize 
stage.
+  // We need to know about it to disable automated correctness checking for 
locations
+  // with this variability.
+  private boolean streamingAggVariability_ = false;
+
   // These fields accumulate partial results until finalizeHash() is called.
   private Hasher hasher_ = Hashing.murmur3_128().newHasher();
 
@@ -157,6 +164,19 @@ public class TupleCacheInfo {
     return ineligibilityReasons_.isEmpty();
   }
 
+  public void setStreamingAggVariability() {
+    Preconditions.checkState(!streamingAggVariability_);
+    streamingAggVariability_ = true;
+  }
+
+  public void clearStreamingAggVariability() {
+    streamingAggVariability_ = false;
+  }
+
+  public boolean getStreamingAggVariability() {
+    return streamingAggVariability_;
+  }
+
   public String getHashString() {
     Preconditions.checkState(isEligible(),
         "TupleCacheInfo only has a hash if it is cache eligible");
@@ -289,6 +309,11 @@ public class TupleCacheInfo {
         // id translation maps.
         registerTupleHelper(id, false);
       }
+
+      // The variability transmits up the tree until the aggregation is 
finalized.
+      if (child.streamingAggVariability_) {
+        streamingAggVariability_ = true;
+      }
       return true;
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
index d93145668..9b76f4673 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
@@ -43,9 +43,12 @@ public class TupleCacheNode extends PlanNode {
 
   protected String compileTimeKey_;
   protected String hashTrace_;
+  protected boolean displayCorrectnessCheckingInfo_;
+  protected boolean skipCorrectnessVerification_;
   protected final List<Integer> inputScanNodeIds_ = new ArrayList<Integer>();
 
-  public TupleCacheNode(PlanNodeId id, PlanNode child) {
+  public TupleCacheNode(PlanNodeId id, PlanNode child,
+      boolean displayCorrectnessCheckingInfo) {
     super(id, "TUPLE CACHE");
     addChild(child);
     cardinality_ = child.getCardinality();
@@ -55,6 +58,10 @@ public class TupleCacheNode extends PlanNode {
     Preconditions.checkState(childCacheInfo.isEligible());
     compileTimeKey_ = childCacheInfo.getHashString();
     hashTrace_ = childCacheInfo.getHashTrace();
+    // If there is variability due to a streaming agg, skip the correctness 
verification
+    // for this location.
+    skipCorrectnessVerification_ = childCacheInfo.getStreamingAggVariability();
+    displayCorrectnessCheckingInfo_ = displayCorrectnessCheckingInfo;
     for (HdfsScanNode scanNode : childCacheInfo.getInputScanNodes()) {
       // Inputs into the tuple cache need to use deterministic scan range 
assignment
       scanNode.setDeterministicScanRangeAssignment(true);
@@ -87,6 +94,7 @@ public class TupleCacheNode extends PlanNode {
     TTupleCacheNode tupleCacheNode = new TTupleCacheNode();
     tupleCacheNode.setCompile_time_key(compileTimeKey_);
     tupleCacheNode.setInput_scan_node_ids(inputScanNodeIds_);
+    
tupleCacheNode.setSkip_correctness_verification(skipCorrectnessVerification_);
     msg.setTuple_cache_node(tupleCacheNode);
   }
 
@@ -113,6 +121,11 @@ public class TupleCacheNode extends PlanNode {
     StringBuilder output = new StringBuilder();
     output.append(String.format("%s%s:%s\n", prefix, id_.toString(), 
displayName_));
     output.append(detailPrefix + "cache key: " + compileTimeKey_ + "\n");
+    // Only display information about correctness verification if it is enabled
+    if (displayCorrectnessCheckingInfo_) {
+      output.append(detailPrefix + "skip correctness verification: " +
+          skipCorrectnessVerification_ + "\n");
+    }
 
     // For debuggability, always print the hash trace until the cache key 
calculation
     // matures. Print trace in chunks to avoid excessive wrapping and padding 
in
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
index 17b26fa81..2e6d64743 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
@@ -20,6 +20,7 @@ package org.apache.impala.planner;
 import java.util.List;
 
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.thrift.TQueryOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,8 +94,12 @@ public class TupleCachePlanner {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Adding TupleCacheNode above node " + node.getId().toString());
     }
+    // Get current query options
+    TQueryOptions queryOptions =
+      ctx_.getRootAnalyzer().getQueryCtx().client_request.getQuery_options();
     // Allocate TupleCacheNode
-    TupleCacheNode tupleCacheNode = new TupleCacheNode(ctx_.getNextNodeId(), 
node);
+    TupleCacheNode tupleCacheNode = new TupleCacheNode(ctx_.getNextNodeId(), 
node,
+         queryOptions.isEnable_tuple_cache_verification());
     tupleCacheNode.init(ctx_.getRootAnalyzer());
     PlanFragment curFragment = node.getFragment();
     if (node == curFragment.getPlanRoot()) {
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java 
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
index 7cbf01b59..3abffe2b9 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -420,6 +420,25 @@ public class TupleCacheTest extends PlannerTestBase {
     }
   }
 
+  @Test
+  public void testSkipCorrectnessChecking() {
+    // Locations after a streaming aggregation before the finalize can have 
variability
+    // that correctness checking can't handle.
+    List<PlanNode> cacheEligibleNodes =
+      getCacheEligibleNodes(
+          "select int_col, count(*) from functional.alltypes group by int_col",
+          /* isDistributedPlan */ true);
+    // In this plan, there is a streaming aggregation that feeds into a 
partitioned
+    // exchange. The finalize phase is past that partitioned exchange. That 
means that
+    // the only eligible node marked with streaming agg variability is the 
initial
+    // streaming AggregationNode.
+    for (PlanNode node : cacheEligibleNodes) {
+      if (node instanceof AggregationNode) {
+        assertTrue(node.getTupleCacheInfo().getStreamingAggVariability());
+      }
+    }
+  }
+
   protected List<PlanNode> getCacheEligibleNodes(String query,
       boolean isDistributedPlan) {
     List<PlanFragment> plan = getPlan(query, isDistributedPlan);
diff --git a/tests/query_test/test_tuple_cache_tpc_queries.py 
b/tests/query_test/test_tuple_cache_tpc_queries.py
index c1f5a9a57..da264ac83 100644
--- a/tests/query_test/test_tuple_cache_tpc_queries.py
+++ b/tests/query_test/test_tuple_cache_tpc_queries.py
@@ -33,7 +33,7 @@ def run_tuple_cache_test(self, vector, query, mtdop):
   # Use a long runtime filter wait time (1 minute) to ensure filters arrive 
before
   # generating the tuple cache for correctness check.
   if IS_TUPLE_CACHE_CORRECT_CHECK:
-    vector.get_value('exec_option')['runtime_filter_wait_time_ms'] = 60000
+    vector.get_value('exec_option')['runtime_filter_wait_time_ms'] = 600000
     vector.get_value('exec_option')['enable_tuple_cache_verification'] = True
   vector.get_value('exec_option')['mt_dop'] = mtdop
   # Run twice to test write and read the tuple cache.

Reply via email to