This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 78a27c56f IMPALA-13898: Incorporate partition information into tuple 
cache keys
78a27c56f is described below

commit 78a27c56fec29f5f27c24e5b5cd32b454f6dba07
Author: Joe McDonnell <[email protected]>
AuthorDate: Wed Jun 18 10:17:48 2025 -0700

    IMPALA-13898: Incorporate partition information into tuple cache keys
    
    Currently, the tuple cache keys do not include partition
    information in either the planner key or the fragment instance
    key. However, the partition actually is important to correctness.
    
    First, there are settings defined on the table and partition that
    can impact the results. For example, for processing text files,
    the separator, escape character, etc are specified at the table
    level. This impacts the rows produced from a given file. There
    are other such settings stored at the partition level (e.g.
    the JSON binary format).
    
    Second, it is possible to have two partitions pointed at the same
    filesystem location. For example, 
scale_db.num_partitions_1234_blocks_per_partition_1
    is a table that has all partitions pointing to the same
    location. In that case, the cache can't tell the partitions
    apart based on the files alone. This is an exotic configuration.
    Incorporating an identifier of the partition (e.g. the partition
    keys/values) allows the cache to tell the difference.
    
    To fix this, we incorporate partition information into the
    key. At planning time, when incorporating the scan range information,
    we also incorporate information about the associated partitions.
    This moves the code to HdfsScanNode and changes it to iterate over
    the partitions, hashing both the partition information and the scan
    ranges. At runtime, the TupleCacheNode looks up the partition
    associated with a scan node and hashes the additional information
    on the HdfsPartitionDescriptor.
    
    This includes some test-only changes to make it possible to run the
    TestBinaryType::test_json_binary_format test case with tuple caching.
    ImpalaTestSuite::_get_table_location() (used by clone_table()) now
    detects a fully-qualified table name and extracts the database from it.
    It only uses the vector to calculate the database if the table is
    not fully qualified. This allows a test to clone a table without
    needing to manipulate its vector to match the right database. This
    also changes _get_table_location() so that it does not switch into the
    database. This required reworking test_scanners_fuzz.py to use absolute
    paths for queries. It turns out that some tests in test_scanners_fuzz.py
    were running in the wrong database and running against uncorrupted
    tables. After this is corrected, some tests can crash Impala. This
    xfails those tests until this can be fixed (tracked by IMPALA-14219).
    
    Testing:
     - Added a frontend test in TupleCacheTest for a table with
       multiple partitions pointed at the same place.
     - Added custom cluster tests testing both issues
    
    Change-Id: I3a7109fcf8a30bf915bb566f7d642f8037793a8c
    Reviewed-on: http://gerrit.cloudera.org:8080/23074
    Reviewed-by: Yida Wu <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Joe McDonnell <[email protected]>
---
 be/src/exec/exec-node.h                            |   2 +-
 be/src/exec/tuple-cache-node.cc                    | 104 +++++++++++++++++----
 be/src/exec/tuple-cache-node.h                     |  12 ++-
 be/src/runtime/descriptors.cc                      |  11 +++
 be/src/runtime/descriptors.h                       |   4 +
 .../org/apache/impala/planner/HdfsScanNode.java    |  75 +++++++++++++++
 .../org/apache/impala/planner/TupleCacheInfo.java  |  40 +++-----
 .../org/apache/impala/planner/TupleCacheTest.java  |  12 +++
 tests/common/impala_test_suite.py                  |  15 ++-
 tests/custom_cluster/test_tuple_cache.py           |  36 +++++++
 tests/query_test/test_scanners_fuzz.py             |  26 ++++--
 11 files changed, 275 insertions(+), 62 deletions(-)

diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index ac3d7bcc7..550142d6f 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -285,7 +285,7 @@ class ExecNode {
 
   int id() const { return id_; }
   TPlanNodeType::type type() const { return type_; }
-  const PlanNode& plan_node() { return plan_node_; }
+  const PlanNode& plan_node() const { return plan_node_; }
 
   /// Returns a unique label for this ExecNode of the form 
"PLAN_NODE_TYPE(id=[int])",
   /// for example, EXCHANGE_NODE (id=2).
diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc
index 57bc17f8e..455245f6b 100644
--- a/be/src/exec/tuple-cache-node.cc
+++ b/be/src/exec/tuple-cache-node.cc
@@ -18,6 +18,7 @@
 #include <gflags/gflags.h>
 
 #include "exec/exec-node-util.h"
+#include "exec/hdfs-scan-node-base.h"
 #include "exec/tuple-cache-node.h"
 #include "exec/tuple-file-reader.h"
 #include "exec/tuple-file-writer.h"
@@ -432,37 +433,102 @@ void TupleCacheNode::DebugString(int indentation_level, 
stringstream* out) const
   *out << ")";
 }
 
+// This hashes all the fields of the HdfsPartitionDescriptor, except:
+// 1. block_size: Only used for writing, so it doesn't matter for reading
+// 2. location: The location is hashed as part of the scan range
+// 3. id: The partition id is not stable over time
+// As a substitute for the "id", this uses the partition key expr hash, which
+// is a stable identifier for the partition.
+uint32_t TupleCacheNode::HashHdfsPartitionDescriptor(
+    const HdfsPartitionDescriptor* partition_desc, uint32_t seed) {
+  uint32_t hash = seed;
+  char line_delim = partition_desc->line_delim();
+  hash = HashUtil::Hash(&line_delim, sizeof(line_delim), hash);
+  char field_delim = partition_desc->field_delim();
+  hash = HashUtil::Hash(&field_delim, sizeof(field_delim), hash);
+  char collection_delim = partition_desc->collection_delim();
+  hash = HashUtil::Hash(&collection_delim, sizeof(collection_delim), hash);
+  char escape_char = partition_desc->escape_char();
+  hash = HashUtil::Hash(&escape_char, sizeof(escape_char), hash);
+  std::string file_format = to_string(partition_desc->file_format());
+  hash = HashUtil::Hash(file_format.data(), file_format.length(), hash);
+  hash = HashUtil::Hash(partition_desc->encoding_value().data(),
+      partition_desc->encoding_value().length(), hash);
+  std::string json_binary_format = 
to_string(partition_desc->json_binary_format());
+  hash = HashUtil::Hash(json_binary_format.data(), 
json_binary_format.length(), hash);
+  uint32_t partition_key_expr_hash = partition_desc->partition_key_expr_hash();
+  hash = HashUtil::Hash(&partition_key_expr_hash, 
sizeof(partition_key_expr_hash), hash);
+  return hash;
+}
+
+uint32_t TupleCacheNode::HashHdfsFileSplit(const HdfsFileSplitPB& split, 
uint32_t seed) {
+  uint32_t hash = seed;
+  if (split.has_relative_path() && !split.relative_path().empty()) {
+    hash = HashUtil::Hash(
+        split.relative_path().data(), split.relative_path().length(), hash);
+    DCHECK(split.has_partition_path_hash());
+    int32_t partition_path_hash = split.partition_path_hash();
+    hash = HashUtil::Hash(&partition_path_hash, sizeof(partition_path_hash), 
hash);
+  } else if (split.has_absolute_path() && !split.absolute_path().empty()) {
+    hash = HashUtil::Hash(
+        split.absolute_path().data(), split.absolute_path().length(), hash);
+  } else {
+    DCHECK("Either relative_path or absolute_path must be set");
+  }
+  DCHECK(split.has_offset());
+  int64_t offset = split.offset();
+  hash = HashUtil::Hash(&offset, sizeof(offset), hash);
+  DCHECK(split.has_length());
+  int64_t length = split.length();
+  hash = HashUtil::Hash(&length, sizeof(length), hash);
+  DCHECK(split.has_mtime());
+  int64_t mtime = split.mtime();
+  hash = HashUtil::Hash(&mtime, sizeof(mtime), hash);
+  return hash;
+}
+
 void TupleCacheNode::ComputeFragmentInstanceKey(const RuntimeState* state) {
   const PlanFragmentInstanceCtxPB& ctx = state->instance_ctx_pb();
   uint32_t hash = 0;
+  // Collect the HdfsScanNodes below this point. The HdfsScanNodes have 
information about
+  // the partitions that we need to include in the fragment instance key. Some 
locations
+  // may have a large number of scan nodes below them, so construct a map from 
the node
+  // id to the HdfsScanNodeBase.
+  vector<ExecNode*> scan_nodes;
+  CollectNodes(TPlanNodeType::HDFS_SCAN_NODE, &scan_nodes);
+  unordered_map<int, const HdfsScanNodeBase*> id_to_scan_node_map;
+  for (const ExecNode* exec_node : scan_nodes) {
+    const HdfsScanNodeBase* scan_node =
+        static_cast<const HdfsScanNodeBase*>(exec_node);
+    int node_id = exec_node->plan_node().tnode_->node_id;
+    DCHECK(id_to_scan_node_map.find(node_id) == id_to_scan_node_map.end())
+        << "Duplicate scan node id: " << node_id;
+    id_to_scan_node_map[node_id] = scan_node;
+  }
   for (int32_t node_id : 
plan_node().tnode_->tuple_cache_node.input_scan_node_ids) {
+    const HdfsTableDescriptor* hdfs_table = 
id_to_scan_node_map[node_id]->hdfs_table();
+    DCHECK(hdfs_table != nullptr);
     auto ranges = ctx.per_node_scan_ranges().find(node_id);
     if (ranges == ctx.per_node_scan_ranges().end()) continue;
     for (const ScanRangeParamsPB& params : ranges->second.scan_ranges()) {
       // This only supports HDFS right now
       DCHECK(params.scan_range().has_hdfs_file_split());
       const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split();
-      if (split.has_relative_path() && !split.relative_path().empty()) {
-        hash = HashUtil::Hash(
-            split.relative_path().data(), split.relative_path().length(), 
hash);
-        DCHECK(split.has_partition_path_hash());
-        int32_t partition_path_hash = split.partition_path_hash();
-        hash = HashUtil::Hash(&partition_path_hash, 
sizeof(partition_path_hash), hash);
-      } else if (split.has_absolute_path() && !split.absolute_path().empty()) {
-        hash = HashUtil::Hash(
-            split.absolute_path().data(), split.absolute_path().length(), 
hash);
+      // Information on the partition can influence how files are processed. 
For example,
+      // for text files, the delimiter can be specified on the partition 
level. There
+      // are several such attributes. We need to incorporate the partition 
information
+      // into the hash for each file split.
+      const HdfsPartitionDescriptor* partition_desc =
+          hdfs_table->GetPartition(split.partition_id());
+      DCHECK(partition_desc != nullptr);
+      if (partition_desc != nullptr) {
+        hash = HashHdfsPartitionDescriptor(partition_desc, hash);
       } else {
-        DCHECK("Either relative_path or absolute_path must be set");
+        LOG(WARNING) << "Partition id " << split.partition_id()
+                     << " not found in table " << 
hdfs_table->fully_qualified_name()
+                     << " split filename: " << split.relative_path();
       }
-      DCHECK(split.has_offset());
-      int64_t offset = split.offset();
-      hash = HashUtil::Hash(&offset, sizeof(offset), hash);
-      DCHECK(split.has_length());
-      int64_t length = split.length();
-      hash = HashUtil::Hash(&length, sizeof(length), hash);
-      DCHECK(split.has_mtime());
-      int64_t mtime = split.mtime();
-      hash = HashUtil::Hash(&mtime, sizeof(mtime), hash);
+      hash = HashHdfsFileSplit(split, hash);
     }
   }
   fragment_instance_key_ = hash;
diff --git a/be/src/exec/tuple-cache-node.h b/be/src/exec/tuple-cache-node.h
index ef2e2dd6b..51a90bc0c 100644
--- a/be/src/exec/tuple-cache-node.h
+++ b/be/src/exec/tuple-cache-node.h
@@ -27,6 +27,8 @@ namespace impala {
 class TupleFileReader;
 class TupleFileWriter;
 class TupleTextFileWriter;
+class HdfsFileSplitPB;
+class HdfsPartitionDescriptor;
 
 class TupleCachePlanNode : public PlanNode {
  public:
@@ -85,8 +87,16 @@ private:
 
   void ReleaseResult();
 
+  // Hash the relevant attributes from an HdfsPartitionDescriptor using the 
specified
+  // seed.
+  uint32_t HashHdfsPartitionDescriptor(const HdfsPartitionDescriptor* 
partition_desc,
+      uint32_t seed);
+
+  // Hash the relevant attributes from an HdfsFileSplit using the specified 
seed.
+  uint32_t HashHdfsFileSplit(const HdfsFileSplitPB& split, uint32_t seed);
+
   // Construct the fragment instance part of the cache key by hashing 
information about
-  // inputs to this fragment (e.g. scan ranges).
+  // inputs to this fragment (e.g. scan ranges and partition settings).
   void ComputeFragmentInstanceKey(const RuntimeState *state);
 
   /// Reader/Writer for caching
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index d4bc2f62d..5af38fd43 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -35,6 +35,7 @@
 #include "gen-cpp/PlanNodes_types.h"
 #include "rpc/thrift-util.h"
 #include "runtime/runtime-state.h"
+#include "util/hash-util.h"
 
 #include "common/names.h"
 
@@ -208,6 +209,15 @@ string TableDescriptor::DebugString() const {
   return out.str();
 }
 
+static uint32_t HashPartitionKeyExprs(const vector<TExpr>& 
partition_key_exprs) {
+  stringstream stream;
+  for (const TExpr& texpr : partition_key_exprs) {
+    stream << texpr;
+  }
+  string s = stream.str();
+  return HashUtil::Hash(s.data(), s.length(), 0);
+}
+
 HdfsPartitionDescriptor::HdfsPartitionDescriptor(
     const THdfsTable& thrift_table, const THdfsPartition& thrift_partition)
   : id_(thrift_partition.id),
@@ -222,6 +232,7 @@ HdfsPartitionDescriptor::HdfsPartitionDescriptor(
   json_binary_format_ = sd.jsonBinaryFormat;
   encoding_value_ = sd.__isset.encodingValue ? sd.encodingValue : "";
   DecompressLocation(thrift_table, thrift_partition, &location_);
+  partition_key_expr_hash_ = 
HashPartitionKeyExprs(thrift_partition_key_exprs_);
 }
 
 string HdfsPartitionDescriptor::DebugString() const {
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 8b0457286..bf18c249e 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -403,6 +403,7 @@ class HdfsPartitionDescriptor {
   const std::string& location() const { return location_; }
   int64_t id() const { return id_; }
   TJsonBinaryFormat::type json_binary_format() const { return 
json_binary_format_; }
+  uint32_t partition_key_expr_hash() const { return partition_key_expr_hash_; }
   std::string DebugString() const;
 
   /// It is safe to call the returned expr evaluators concurrently from 
multiple
@@ -425,12 +426,15 @@ class HdfsPartitionDescriptor {
   // stripped.
   std::string location_;
   int64_t id_;
+  uint32_t partition_key_expr_hash_;
 
   /// List of literal (and therefore constant) expressions for each partition 
key. Their
   /// order corresponds to the first num_clustering_cols of the parent table.
   /// The Prepare()/Open()/Close() cycle is controlled by the containing 
descriptor table
   /// because the same partition descriptor may be used by multiple exec nodes 
with
   /// different lifetimes.
+  /// WARNING: This is only valid for a brief time and is left dangling. It 
can't be
+  /// exposed publicly.
   const std::vector<TExpr>& thrift_partition_key_exprs_;
 
   /// These evaluators are safe to be shared by all fragment instances as all 
expressions
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index ed47bd1f6..bdc3565b3 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -84,6 +84,7 @@ import org.apache.impala.thrift.TFileSplitGeneratorSpec;
 import org.apache.impala.thrift.TJsonBinaryFormat;
 import org.apache.impala.thrift.THdfsFileSplit;
 import org.apache.impala.thrift.THdfsScanNode;
+import org.apache.impala.thrift.THdfsStorageDescriptor;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TOverlapPredicateDesc;
 import org.apache.impala.thrift.TPlanNode;
@@ -2828,4 +2829,78 @@ public class HdfsScanNode extends ScanNode {
     }
     return ndvMult;
   }
+
+  // Incorporate the details from this scan node into the supplied 
TupleCacheInfo. This
+  // is used when an entire scan node needs to be incorporated into a tuple 
cache key
+  // (e.g. for the build side of a join). This iterates over the partitions, 
hashing
+  // the partition information (storage descriptor and partition keys/values) 
as well as
+  // the associated scan ranges.
+  public void incorporateScansIntoTupleCache(TupleCacheInfo info) {
+    TScanRangeSpec orig = getScanRangeSpecs();
+
+    // Sort the partitions so it is consistent over time (the partition name 
is built
+    // from the partition column values).
+    List<FeFsPartition> sortedPartitions = new 
ArrayList<>(getSampledOrRawPartitions());
+    sortedPartitions.sort(
+        (p1, p2) -> p1.getPartitionName().compareTo(p2.getPartitionName()));
+    for (FeFsPartition partition : sortedPartitions) {
+      TScanRangeSpec spec = new TScanRangeSpec();
+      boolean hasScanRange = false;
+      if (orig.isSetConcrete_ranges()) {
+        for (TScanRangeLocationList origLocList: orig.concrete_ranges) {
+          if (origLocList.scan_range.hdfs_file_split.partition_id != 
partition.getId()) {
+            continue;
+          }
+          hasScanRange = true;
+          // We only need the TScanRange, which provides the file segment info.
+          TScanRangeLocationList locList = new TScanRangeLocationList();
+          TScanRange scanRange = origLocList.scan_range.deepCopy();
+          if (scanRange.isSetHdfs_file_split()) {
+            // Zero out partition_id, it's not stable.
+            scanRange.hdfs_file_split.partition_id = 0;
+          }
+          locList.setScan_range(scanRange);
+          spec.addToConcrete_ranges(locList);
+        }
+        if (hasScanRange) {
+          // Reloaded partitions may have a different order. Sort for 
stability.
+          spec.concrete_ranges.sort(null);
+        }
+      }
+      if (orig.isSetSplit_specs()) {
+        for (TFileSplitGeneratorSpec origSplitSpec: orig.split_specs) {
+          if (origSplitSpec.partition_id != partition.getId()) continue;
+          hasScanRange = true;
+          TFileSplitGeneratorSpec splitSpec = origSplitSpec.deepCopy();
+          // Zero out partition_id, it's not stable.
+          splitSpec.partition_id = 0;
+          spec.addToSplit_specs(splitSpec);
+        }
+        // Reloaded partitions may have a different order. Sort for stability.
+        spec.split_specs.sort(null);
+      }
+
+      // We should ignore empty partitions, so only include the information if 
there is
+      // at least one scan range.
+      if (hasScanRange) {
+        // Incorporate the storage descriptor. This contains several fields 
that can
+        // impact correctness, including the escape character, separator 
character,
+        // json binary format, etc.
+        THdfsStorageDescriptor inputFormat =
+          partition.getInputFormatDescriptor().toThrift();
+        // Zero the block size, as it is not relevant to reads.
+        inputFormat.setBlockSize(0);
+        info.hashThrift(inputFormat);
+
+        // Hash the partition name (which includes the partition keys and 
values)
+        // This is necessary for cases where two partitions point to the same
+        // directory and files. Without knowing the partition keys/values, the
+        // cache can't tell them apart.
+        info.hashString(partition.getPartitionName());
+
+        // Hash the scan range information
+        info.hashThrift(spec);
+      }
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
index f1220b40d..83a1835c2 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
@@ -248,34 +248,7 @@ public class TupleCacheInfo {
     // filename, mtime, size, and offset. Others like partition_id may change 
after
     // reloading metadata.
     for (HdfsScanNode scanNode: inputScanNodes_) {
-      TScanRangeSpec orig = scanNode.getScanRangeSpecs();
-      TScanRangeSpec spec = new TScanRangeSpec();
-      if (orig.isSetConcrete_ranges()) {
-        for (TScanRangeLocationList origLocList: orig.concrete_ranges) {
-          // We only need the TScanRange, which provides the file segment info.
-          TScanRangeLocationList locList = new TScanRangeLocationList();
-          TScanRange scanRange = origLocList.scan_range.deepCopy();
-          if (scanRange.isSetHdfs_file_split()) {
-            // Zero out partition_id, it's not stable.
-            scanRange.hdfs_file_split.partition_id = 0;
-          }
-          locList.setScan_range(scanRange);
-          spec.addToConcrete_ranges(locList);
-        }
-        // Reloaded partitions may have a different order. Sort for stability.
-        spec.concrete_ranges.sort(null);
-      }
-      if (orig.isSetSplit_specs()) {
-        for (TFileSplitGeneratorSpec origSplitSpec: orig.split_specs) {
-          TFileSplitGeneratorSpec splitSpec = origSplitSpec.deepCopy();
-          // Zero out partition_id, it's not stable.
-          splitSpec.partition_id = 0;
-          spec.addToSplit_specs(splitSpec);
-        }
-        // Reloaded partitions may have a different order. Sort for stability.
-        spec.split_specs.sort(null);
-      }
-      hashThrift(spec);
+      scanNode.incorporateScansIntoTupleCache(this);
     }
     // The scan ranges have been incorporated into the key and are no longer 
needed
     // at runtime.
@@ -341,6 +314,17 @@ public class TupleCacheInfo {
     hashTraceBuilder_.append(thriftString);
   }
 
+  /**
+   * Hash a regular string and incorporate it into the key
+   */
+  public void hashString(String s) {
+    Preconditions.checkState(!finalized_,
+        "TupleCacheInfo is finalized and can't be modified");
+    Preconditions.checkState(s != null);
+    hasher_.putUnencodedChars(s);
+    hashTraceBuilder_.append(s);
+  }
+
   /**
    * registerTuple() does two things:
    * 1. It incorporates a tuple's layout (and slot information) into the cache 
key.
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java 
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
index 34221fa6e..a854d7542 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -436,6 +436,18 @@ public class TupleCacheTest extends PlannerTestBase {
     verifyJoinNodesEligible(
         "select * from 
functional_parquet.iceberg_v2_positional_delete_all_rows", 1,
         /* isDistributedPlan */ true);
+
+    // When incorporating the scan range information from the build side of the
+    // join, we need to also incorporate information about the partitions 
involved.
+    // scale_db.num_partitions_1234_blocks_per_partition_1 is an exotic table 
where
+    // all the partitions point to the same file. If we don't incorporate 
partition
+    // information, then it can't tell apart queries against different 
partitions.
+    String incorporatePartitionSqlTemplate =
+        "select straight_join build.j, probe.id from functional.alltypes 
probe, " +
+        "scale_db.num_partitions_1234_blocks_per_partition_1 build " +
+        "where probe.id = build.i and build.j = %s";
+    verifyOverlappingCacheKeys(String.format(incorporatePartitionSqlTemplate, 
1),
+        String.format(incorporatePartitionSqlTemplate, 2));
   }
 
   @Test
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 3e6ce0c8a..9e7414716 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1320,12 +1320,17 @@ class ImpalaTestSuite(BaseTestSuite):
     assert abs(a - b) / float(max(abs(a), abs(b))) <= diff_perc
 
   def _get_table_location(self, table_name, vector):
-    """ Returns the HDFS location of the table.
-    This method changes self.client to point to the dabatase described by 
'vector'."""
-    db_name = self.get_db_name_from_format(vector.get_table_format())
-    self.__change_client_database(self.client, db_name=db_name)
+    """ Returns the HDFS location of the table. If the table is not fully 
qualified,
+    this uses the database from the vector."""
+    is_fully_qualified = table_name.find(".") != -1
+    if is_fully_qualified:
+      fq_table_name = table_name
+    else:
+      db_name = self.get_db_name_from_format(vector.get_table_format())
+      fq_table_name = "{0}.{1}".format(db_name, table_name)
+
     result = self.execute_query_using_client(self.client,
-        "describe formatted %s" % table_name, vector)
+        "describe formatted %s" % fq_table_name, vector)
     for row in result.data:
       if 'Location:' in row:
         return row.split('\t')[1]
diff --git a/tests/custom_cluster/test_tuple_cache.py 
b/tests/custom_cluster/test_tuple_cache.py
index 4b5b13724..4df907b97 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -404,6 +404,42 @@ class TestTupleCacheSingle(TestTupleCacheBase):
     self.run_test_case('QueryTest/parquet-resolution-by-name', vector,
                        use_db=unique_database)
 
+  def test_partition_information(self, vector):
+    """Verify that partition information is incorporated into the runtime 
cache key"""
+    self.client.set_configuration(vector.get_value('exec_option'))
+
+    # scale_db.num_partitions_1234_blocks_per_partition_1 is an exotic table 
where all
+    # the partitions point to the same filesystem location. A single file is 
read many
+    # times for different partitions. It is not possible to tell the 
partitions apart
+    # by the file path, so this verifies that the partition information is 
being included
+    # properly.
+    query_template = \
+        "select i, j from scale_db.num_partitions_1234_blocks_per_partition_1 
where j={0}"
+    # Run against the j=1 partition
+    result1 = self.execute_query(query_template.format(1))
+    assert result1.success
+    assertCounters(result1.runtime_profile, 0, 0, 0)
+    assert len(result1.data) == 1
+    assert result1.data[0].split("\t") == ["1", "1"]
+
+    # Run against the j=2 partition. There should not be a cache hit, because 
they are
+    # running against different partitions. This only works if the runtime key
+    # incorporates the partition information.
+    result2 = self.execute_query(query_template.format(2))
+    assert result2.success
+    assertCounters(result2.runtime_profile, 0, 0, 0)
+    assert len(result2.data) == 1
+    assert result2.data[0].split("\t") == ["1", "2"]
+
+  def test_json_binary_format(self, vector, unique_database):
+    """This is identical to test_scanners.py's 
TestBinaryType::test_json_binary_format.
+       That test modifies a table's serde properties to change the json binary 
format.
+       The tuple cache detects that by including the partition's storage 
descriptor
+       information. This fails if that doesn't happen."""
+    test_tbl = unique_database + '.binary_tbl'
+    self.clone_table('functional_json.binary_tbl', test_tbl, False, vector)
+    self.run_test_case('QueryTest/json-binary-format', vector, unique_database)
+
 
 @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
 class TestTupleCacheCluster(TestTupleCacheBase):
diff --git a/tests/query_test/test_scanners_fuzz.py 
b/tests/query_test/test_scanners_fuzz.py
index 8b10a4d5b..5c0f0d433 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -108,18 +108,22 @@ class TestScannersFuzzing(ImpalaTestSuite):
     src_db = QueryTestSectionReader.get_db_name(table_format)
 
     if table_format.file_format not in ['parquet', 'orc']: pytest.skip()
+    # For historical reasons, this test ran against the wrong database and was
+    # running against the uncorrupted table. Now that this is corrected, it 
frequently
+    # crashes Impala. Until that is fixed in IMPALA-14219, we need to xfail 
this test.
+    pytest.xfail("IMPALA-14219: this test can crash Impala")
     # Additional queries to scan the nested values.
     custom_queries = [
       "select count(*) from ("
       "  select distinct t.id, a.pos as apos, a.item as aitem, aa.pos, 
aa.item, "
       "    m.key as mkey, m.value as mvalue, ma.key, ma.value, 
t.nested_struct.* "
-      "  from complextypestbl t, t.int_array a, t.int_array_array.item aa, "
+      "  from {db}.{table} t, t.int_array a, t.int_array_array.item aa, "
       "    t.int_map m, t.int_map_array.item ma) q",
 
       "select count(*) from ("
       "  select t.id, t.nested_struct.a, b.pos as bpos, b.item as bitem, i.e, 
i.f, m.key,"
       "    arr.pos, arr.item "
-      "  from complextypestbl t, t.nested_struct.b, t.nested_struct.c.d.item 
i,"
+      "  from {db}.{table} t, t.nested_struct.b, t.nested_struct.c.d.item i,"
       "    t.nested_struct.g m, m.value.h.i arr) q",
     ]
     self.run_fuzz_test(vector, src_db, table_name, unique_database, 
table_name, 10,
@@ -158,12 +162,16 @@ class TestScannersFuzzing(ImpalaTestSuite):
   def test_fuzz_parquet_v2(self, vector, unique_database):
     table_format = vector.get_value('table_format')
     if table_format.file_format != 'parquet': pytest.skip()
+    # For historical reasons, this test ran against the wrong database and was
+    # running against the uncorrupted table. Now that this is corrected, it 
frequently
+    # crashes Impala. Until that is fixed in IMPALA-14219, we need to xfail 
this test.
+    pytest.xfail("IMPALA-14219: this test can crash Impala")
 
     tables = ["alltypesagg_parquet_v2_uncompressed", 
"alltypesagg_parquet_v2_snappy"]
     for table_name in tables:
       custom_queries = [
         "select avg(float_col), avg(double_col), avg(timestamp_col)"
-        "  from %s where bool_col;" % table_name
+        "  from {db}.{table} where bool_col;"
       ]
       self.run_fuzz_test(vector, "functional_parquet", table_name, 
unique_database,
                       table_name, 10, custom_queries)
@@ -172,7 +180,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
               "complextypestbl_parquet_v2_snappy"]
     for table_name in tables:
       custom_queries = [
-        "select int_array from %s;" % table_name
+        "select int_array from {db}.{table};"
       ]
       self.run_fuzz_test(vector, "functional_parquet", table_name, 
unique_database,
                   table_name, 10, custom_queries)
@@ -188,6 +196,8 @@ class TestScannersFuzzing(ImpalaTestSuite):
     SCANNER_FUZZ_SEED can be set in the environment to reproduce the result 
(assuming that
     input files are the same).
     SCANNER_FUZZ_KEEP_FILES can be set in the environment to keep the 
generated files.
+    custom_queries can specify additional queries to run. References to '{db}' 
and
+    '{table}' in the custom queries are replaced with the fuzz db and table.
     """
     # Create and seed a new random number generator for reproducibility.
     rng = random.Random()
@@ -246,11 +256,11 @@ class TestScannersFuzzing(ImpalaTestSuite):
     # Also execute a count(*) that materializes no columns, since different 
code
     # paths are exercised.
     queries = [
-        'select count(*) from (select distinct * from {0}.{1}) q'.format(
-            fuzz_db, fuzz_table),
-        'select count(*) from {0}.{1} q'.format(fuzz_db, fuzz_table)]
+        'select count(*) from (select distinct * from {db}.{table}) q'.format(
+            db=fuzz_db, table=fuzz_table),
+        'select count(*) from {db}.{table} q'.format(db=fuzz_db, 
table=fuzz_table)]
     if custom_queries is not None:
-      queries = queries + [s.format(fuzz_db, fuzz_table) for s in 
custom_queries]
+      queries = queries + [s.format(db=fuzz_db, table=fuzz_table) for s in 
custom_queries]
 
     for query, batch_size, disable_codegen in \
         itertools.product(queries, self.BATCH_SIZES, 
self.DISABLE_CODEGEN_VALUES):

Reply via email to