This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 78a27c56f IMPALA-13898: Incorporate partition information into tuple
cache keys
78a27c56f is described below
commit 78a27c56fec29f5f27c24e5b5cd32b454f6dba07
Author: Joe McDonnell <[email protected]>
AuthorDate: Wed Jun 18 10:17:48 2025 -0700
IMPALA-13898: Incorporate partition information into tuple cache keys
Currently, the tuple cache keys do not include partition
information in either the planner key or the fragment instance
key. However, the partition actually is important to correctness.
First, there are settings defined on the table and partition that
can impact the results. For example, for processing text files,
the separator, escape character, etc are specified at the table
level. This impacts the rows produced from a given file. There
are other such settings stored at the partition level (e.g.
the JSON binary format).
Second, it is possible to have two partitions pointed at the same
filesystem location. For example,
scale_db.num_partitions_1234_blocks_per_partition_1
is a table that has all partitions pointing to the same
location. In that case, the cache can't tell the partitions
apart based on the files alone. This is an exotic configuration.
Incorporating an identifier of the partition (e.g. the partition
keys/values) allows the cache to tell the difference.
To fix this, we incorporate partition information into the
key. At planning time, when incorporating the scan range information,
we also incorporate information about the associated partitions.
This moves the code to HdfsScanNode and changes it to iterate over
the partitions, hashing both the partition information and the scan
ranges. At runtime, the TupleCacheNode looks up the partition
associated with a scan node and hashes the additional information
on the HdfsPartitionDescriptor.
This includes some test-only changes to make it possible to run the
TestBinaryType::test_json_binary_format test case with tuple caching.
ImpalaTestSuite::_get_table_location() (used by clone_table()) now
detects a fully-qualified table name and extracts the database from it.
It only uses the vector to calculate the database if the table is
not fully qualified. This allows a test to clone a table without
needing to manipulate its vector to match the right database. This
also changes _get_table_location() so that it does not switch into the
database. This required reworking test_scanners_fuzz.py to use absolute
paths for queries. It turns out that some tests in test_scanners_fuzz.py
were running in the wrong database and running against uncorrupted
tables. After this is corrected, some tests can crash Impala. This
xfails those tests until this can be fixed (tracked by IMPALA-14219).
Testing:
- Added a frontend test in TupleCacheTest for a table with
multiple partitions pointed at the same place.
- Added custom cluster tests testing both issues
Change-Id: I3a7109fcf8a30bf915bb566f7d642f8037793a8c
Reviewed-on: http://gerrit.cloudera.org:8080/23074
Reviewed-by: Yida Wu <[email protected]>
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Joe McDonnell <[email protected]>
---
be/src/exec/exec-node.h | 2 +-
be/src/exec/tuple-cache-node.cc | 104 +++++++++++++++++----
be/src/exec/tuple-cache-node.h | 12 ++-
be/src/runtime/descriptors.cc | 11 +++
be/src/runtime/descriptors.h | 4 +
.../org/apache/impala/planner/HdfsScanNode.java | 75 +++++++++++++++
.../org/apache/impala/planner/TupleCacheInfo.java | 40 +++-----
.../org/apache/impala/planner/TupleCacheTest.java | 12 +++
tests/common/impala_test_suite.py | 15 ++-
tests/custom_cluster/test_tuple_cache.py | 36 +++++++
tests/query_test/test_scanners_fuzz.py | 26 ++++--
11 files changed, 275 insertions(+), 62 deletions(-)
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index ac3d7bcc7..550142d6f 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -285,7 +285,7 @@ class ExecNode {
int id() const { return id_; }
TPlanNodeType::type type() const { return type_; }
- const PlanNode& plan_node() { return plan_node_; }
+ const PlanNode& plan_node() const { return plan_node_; }
/// Returns a unique label for this ExecNode of the form
"PLAN_NODE_TYPE(id=[int])",
/// for example, EXCHANGE_NODE (id=2).
diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc
index 57bc17f8e..455245f6b 100644
--- a/be/src/exec/tuple-cache-node.cc
+++ b/be/src/exec/tuple-cache-node.cc
@@ -18,6 +18,7 @@
#include <gflags/gflags.h>
#include "exec/exec-node-util.h"
+#include "exec/hdfs-scan-node-base.h"
#include "exec/tuple-cache-node.h"
#include "exec/tuple-file-reader.h"
#include "exec/tuple-file-writer.h"
@@ -432,37 +433,102 @@ void TupleCacheNode::DebugString(int indentation_level,
stringstream* out) const
*out << ")";
}
+// This hashes all the fields of the HdfsPartitionDescriptor, except:
+// 1. block_size: Only used for writing, so it doesn't matter for reading
+// 2. location: The location is hashed as part of the scan range
+// 3. id: The partition id is not stable over time
+// As a substitute for the "id", this uses the partition key expr hash, which
+// is a stable identifier for the partition.
+uint32_t TupleCacheNode::HashHdfsPartitionDescriptor(
+ const HdfsPartitionDescriptor* partition_desc, uint32_t seed) {
+ uint32_t hash = seed;
+ char line_delim = partition_desc->line_delim();
+ hash = HashUtil::Hash(&line_delim, sizeof(line_delim), hash);
+ char field_delim = partition_desc->field_delim();
+ hash = HashUtil::Hash(&field_delim, sizeof(field_delim), hash);
+ char collection_delim = partition_desc->collection_delim();
+ hash = HashUtil::Hash(&collection_delim, sizeof(collection_delim), hash);
+ char escape_char = partition_desc->escape_char();
+ hash = HashUtil::Hash(&escape_char, sizeof(escape_char), hash);
+ std::string file_format = to_string(partition_desc->file_format());
+ hash = HashUtil::Hash(file_format.data(), file_format.length(), hash);
+ hash = HashUtil::Hash(partition_desc->encoding_value().data(),
+ partition_desc->encoding_value().length(), hash);
+ std::string json_binary_format =
to_string(partition_desc->json_binary_format());
+ hash = HashUtil::Hash(json_binary_format.data(),
json_binary_format.length(), hash);
+ uint32_t partition_key_expr_hash = partition_desc->partition_key_expr_hash();
+ hash = HashUtil::Hash(&partition_key_expr_hash,
sizeof(partition_key_expr_hash), hash);
+ return hash;
+}
+
+uint32_t TupleCacheNode::HashHdfsFileSplit(const HdfsFileSplitPB& split,
uint32_t seed) {
+ uint32_t hash = seed;
+ if (split.has_relative_path() && !split.relative_path().empty()) {
+ hash = HashUtil::Hash(
+ split.relative_path().data(), split.relative_path().length(), hash);
+ DCHECK(split.has_partition_path_hash());
+ int32_t partition_path_hash = split.partition_path_hash();
+ hash = HashUtil::Hash(&partition_path_hash, sizeof(partition_path_hash),
hash);
+ } else if (split.has_absolute_path() && !split.absolute_path().empty()) {
+ hash = HashUtil::Hash(
+ split.absolute_path().data(), split.absolute_path().length(), hash);
+ } else {
+ DCHECK("Either relative_path or absolute_path must be set");
+ }
+ DCHECK(split.has_offset());
+ int64_t offset = split.offset();
+ hash = HashUtil::Hash(&offset, sizeof(offset), hash);
+ DCHECK(split.has_length());
+ int64_t length = split.length();
+ hash = HashUtil::Hash(&length, sizeof(length), hash);
+ DCHECK(split.has_mtime());
+ int64_t mtime = split.mtime();
+ hash = HashUtil::Hash(&mtime, sizeof(mtime), hash);
+ return hash;
+}
+
void TupleCacheNode::ComputeFragmentInstanceKey(const RuntimeState* state) {
const PlanFragmentInstanceCtxPB& ctx = state->instance_ctx_pb();
uint32_t hash = 0;
+ // Collect the HdfsScanNodes below this point. The HdfsScanNodes have
information about
+ // the partitions that we need to include in the fragment instance key. Some
locations
+ // may have a large number of scan nodes below them, so construct a map from
the node
+ // id to the HdfsScanNodeBase.
+ vector<ExecNode*> scan_nodes;
+ CollectNodes(TPlanNodeType::HDFS_SCAN_NODE, &scan_nodes);
+ unordered_map<int, const HdfsScanNodeBase*> id_to_scan_node_map;
+ for (const ExecNode* exec_node : scan_nodes) {
+ const HdfsScanNodeBase* scan_node =
+ static_cast<const HdfsScanNodeBase*>(exec_node);
+ int node_id = exec_node->plan_node().tnode_->node_id;
+ DCHECK(id_to_scan_node_map.find(node_id) == id_to_scan_node_map.end())
+ << "Duplicate scan node id: " << node_id;
+ id_to_scan_node_map[node_id] = scan_node;
+ }
for (int32_t node_id :
plan_node().tnode_->tuple_cache_node.input_scan_node_ids) {
+ const HdfsTableDescriptor* hdfs_table =
id_to_scan_node_map[node_id]->hdfs_table();
+ DCHECK(hdfs_table != nullptr);
auto ranges = ctx.per_node_scan_ranges().find(node_id);
if (ranges == ctx.per_node_scan_ranges().end()) continue;
for (const ScanRangeParamsPB& params : ranges->second.scan_ranges()) {
// This only supports HDFS right now
DCHECK(params.scan_range().has_hdfs_file_split());
const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split();
- if (split.has_relative_path() && !split.relative_path().empty()) {
- hash = HashUtil::Hash(
- split.relative_path().data(), split.relative_path().length(),
hash);
- DCHECK(split.has_partition_path_hash());
- int32_t partition_path_hash = split.partition_path_hash();
- hash = HashUtil::Hash(&partition_path_hash,
sizeof(partition_path_hash), hash);
- } else if (split.has_absolute_path() && !split.absolute_path().empty()) {
- hash = HashUtil::Hash(
- split.absolute_path().data(), split.absolute_path().length(),
hash);
+ // Information on the partition can influence how files are processed.
For example,
+ // for text files, the delimiter can be specified on the partition
level. There
+ // are several such attributes. We need to incorporate the partition
information
+ // into the hash for each file split.
+ const HdfsPartitionDescriptor* partition_desc =
+ hdfs_table->GetPartition(split.partition_id());
+ DCHECK(partition_desc != nullptr);
+ if (partition_desc != nullptr) {
+ hash = HashHdfsPartitionDescriptor(partition_desc, hash);
} else {
- DCHECK("Either relative_path or absolute_path must be set");
+ LOG(WARNING) << "Partition id " << split.partition_id()
+ << " not found in table " <<
hdfs_table->fully_qualified_name()
+ << " split filename: " << split.relative_path();
}
- DCHECK(split.has_offset());
- int64_t offset = split.offset();
- hash = HashUtil::Hash(&offset, sizeof(offset), hash);
- DCHECK(split.has_length());
- int64_t length = split.length();
- hash = HashUtil::Hash(&length, sizeof(length), hash);
- DCHECK(split.has_mtime());
- int64_t mtime = split.mtime();
- hash = HashUtil::Hash(&mtime, sizeof(mtime), hash);
+ hash = HashHdfsFileSplit(split, hash);
}
}
fragment_instance_key_ = hash;
diff --git a/be/src/exec/tuple-cache-node.h b/be/src/exec/tuple-cache-node.h
index ef2e2dd6b..51a90bc0c 100644
--- a/be/src/exec/tuple-cache-node.h
+++ b/be/src/exec/tuple-cache-node.h
@@ -27,6 +27,8 @@ namespace impala {
class TupleFileReader;
class TupleFileWriter;
class TupleTextFileWriter;
+class HdfsFileSplitPB;
+class HdfsPartitionDescriptor;
class TupleCachePlanNode : public PlanNode {
public:
@@ -85,8 +87,16 @@ private:
void ReleaseResult();
+ // Hash the relevant attributes from an HdfsPartitionDescriptor using the
specified
+ // seed.
+ uint32_t HashHdfsPartitionDescriptor(const HdfsPartitionDescriptor*
partition_desc,
+ uint32_t seed);
+
+ // Hash the relevant attributes from an HdfsFileSplit using the specified
seed.
+ uint32_t HashHdfsFileSplit(const HdfsFileSplitPB& split, uint32_t seed);
+
// Construct the fragment instance part of the cache key by hashing
information about
- // inputs to this fragment (e.g. scan ranges).
+ // inputs to this fragment (e.g. scan ranges and partition settings).
void ComputeFragmentInstanceKey(const RuntimeState *state);
/// Reader/Writer for caching
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index d4bc2f62d..5af38fd43 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -35,6 +35,7 @@
#include "gen-cpp/PlanNodes_types.h"
#include "rpc/thrift-util.h"
#include "runtime/runtime-state.h"
+#include "util/hash-util.h"
#include "common/names.h"
@@ -208,6 +209,15 @@ string TableDescriptor::DebugString() const {
return out.str();
}
+static uint32_t HashPartitionKeyExprs(const vector<TExpr>&
partition_key_exprs) {
+ stringstream stream;
+ for (const TExpr& texpr : partition_key_exprs) {
+ stream << texpr;
+ }
+ string s = stream.str();
+ return HashUtil::Hash(s.data(), s.length(), 0);
+}
+
HdfsPartitionDescriptor::HdfsPartitionDescriptor(
const THdfsTable& thrift_table, const THdfsPartition& thrift_partition)
: id_(thrift_partition.id),
@@ -222,6 +232,7 @@ HdfsPartitionDescriptor::HdfsPartitionDescriptor(
json_binary_format_ = sd.jsonBinaryFormat;
encoding_value_ = sd.__isset.encodingValue ? sd.encodingValue : "";
DecompressLocation(thrift_table, thrift_partition, &location_);
+ partition_key_expr_hash_ =
HashPartitionKeyExprs(thrift_partition_key_exprs_);
}
string HdfsPartitionDescriptor::DebugString() const {
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 8b0457286..bf18c249e 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -403,6 +403,7 @@ class HdfsPartitionDescriptor {
const std::string& location() const { return location_; }
int64_t id() const { return id_; }
TJsonBinaryFormat::type json_binary_format() const { return
json_binary_format_; }
+ uint32_t partition_key_expr_hash() const { return partition_key_expr_hash_; }
std::string DebugString() const;
/// It is safe to call the returned expr evaluators concurrently from
multiple
@@ -425,12 +426,15 @@ class HdfsPartitionDescriptor {
// stripped.
std::string location_;
int64_t id_;
+ uint32_t partition_key_expr_hash_;
/// List of literal (and therefore constant) expressions for each partition
key. Their
/// order corresponds to the first num_clustering_cols of the parent table.
/// The Prepare()/Open()/Close() cycle is controlled by the containing
descriptor table
/// because the same partition descriptor may be used by multiple exec nodes
with
/// different lifetimes.
+ /// WARNING: This is only valid for a brief time and is left dangling. It
can't be
+ /// exposed publicly.
const std::vector<TExpr>& thrift_partition_key_exprs_;
/// These evaluators are safe to be shared by all fragment instances as all
expressions
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index ed47bd1f6..bdc3565b3 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -84,6 +84,7 @@ import org.apache.impala.thrift.TFileSplitGeneratorSpec;
import org.apache.impala.thrift.TJsonBinaryFormat;
import org.apache.impala.thrift.THdfsFileSplit;
import org.apache.impala.thrift.THdfsScanNode;
+import org.apache.impala.thrift.THdfsStorageDescriptor;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TOverlapPredicateDesc;
import org.apache.impala.thrift.TPlanNode;
@@ -2828,4 +2829,78 @@ public class HdfsScanNode extends ScanNode {
}
return ndvMult;
}
+
+ // Incorporate the details from this scan node into the supplied
TupleCacheInfo. This
+ // is used when an entire scan node needs to be incorporated into a tuple
cache key
+ // (e.g. for the build side of a join). This iterates over the partitions,
hashing
+ // the partition information (storage descriptor and partition keys/values)
as well as
+ // the associated scan ranges.
+ public void incorporateScansIntoTupleCache(TupleCacheInfo info) {
+ TScanRangeSpec orig = getScanRangeSpecs();
+
+ // Sort the partitions so it is consistent over time (the partition name
is built
+ // from the partition column values).
+ List<FeFsPartition> sortedPartitions = new
ArrayList<>(getSampledOrRawPartitions());
+ sortedPartitions.sort(
+ (p1, p2) -> p1.getPartitionName().compareTo(p2.getPartitionName()));
+ for (FeFsPartition partition : sortedPartitions) {
+ TScanRangeSpec spec = new TScanRangeSpec();
+ boolean hasScanRange = false;
+ if (orig.isSetConcrete_ranges()) {
+ for (TScanRangeLocationList origLocList: orig.concrete_ranges) {
+ if (origLocList.scan_range.hdfs_file_split.partition_id !=
partition.getId()) {
+ continue;
+ }
+ hasScanRange = true;
+ // We only need the TScanRange, which provides the file segment info.
+ TScanRangeLocationList locList = new TScanRangeLocationList();
+ TScanRange scanRange = origLocList.scan_range.deepCopy();
+ if (scanRange.isSetHdfs_file_split()) {
+ // Zero out partition_id, it's not stable.
+ scanRange.hdfs_file_split.partition_id = 0;
+ }
+ locList.setScan_range(scanRange);
+ spec.addToConcrete_ranges(locList);
+ }
+ if (hasScanRange) {
+ // Reloaded partitions may have a different order. Sort for
stability.
+ spec.concrete_ranges.sort(null);
+ }
+ }
+ if (orig.isSetSplit_specs()) {
+ for (TFileSplitGeneratorSpec origSplitSpec: orig.split_specs) {
+ if (origSplitSpec.partition_id != partition.getId()) continue;
+ hasScanRange = true;
+ TFileSplitGeneratorSpec splitSpec = origSplitSpec.deepCopy();
+ // Zero out partition_id, it's not stable.
+ splitSpec.partition_id = 0;
+ spec.addToSplit_specs(splitSpec);
+ }
+ // Reloaded partitions may have a different order. Sort for stability.
+ spec.split_specs.sort(null);
+ }
+
+ // We should ignore empty partitions, so only include the information if
there is
+ // at least one scan range.
+ if (hasScanRange) {
+ // Incorporate the storage descriptor. This contains several fields
that can
+ // impact correctness, including the escape character, separator
character,
+ // json binary format, etc.
+ THdfsStorageDescriptor inputFormat =
+ partition.getInputFormatDescriptor().toThrift();
+ // Zero the block size, as it is not relevant to reads.
+ inputFormat.setBlockSize(0);
+ info.hashThrift(inputFormat);
+
+ // Hash the partition name (which includes the partition keys and
values)
+ // This is necessary for cases where two partitions point to the same
+ // directory and files. Without knowing the partition keys/values, the
+ // cache can't tell them apart.
+ info.hashString(partition.getPartitionName());
+
+ // Hash the scan range information
+ info.hashThrift(spec);
+ }
+ }
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
index f1220b40d..83a1835c2 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
@@ -248,34 +248,7 @@ public class TupleCacheInfo {
// filename, mtime, size, and offset. Others like partition_id may change
after
// reloading metadata.
for (HdfsScanNode scanNode: inputScanNodes_) {
- TScanRangeSpec orig = scanNode.getScanRangeSpecs();
- TScanRangeSpec spec = new TScanRangeSpec();
- if (orig.isSetConcrete_ranges()) {
- for (TScanRangeLocationList origLocList: orig.concrete_ranges) {
- // We only need the TScanRange, which provides the file segment info.
- TScanRangeLocationList locList = new TScanRangeLocationList();
- TScanRange scanRange = origLocList.scan_range.deepCopy();
- if (scanRange.isSetHdfs_file_split()) {
- // Zero out partition_id, it's not stable.
- scanRange.hdfs_file_split.partition_id = 0;
- }
- locList.setScan_range(scanRange);
- spec.addToConcrete_ranges(locList);
- }
- // Reloaded partitions may have a different order. Sort for stability.
- spec.concrete_ranges.sort(null);
- }
- if (orig.isSetSplit_specs()) {
- for (TFileSplitGeneratorSpec origSplitSpec: orig.split_specs) {
- TFileSplitGeneratorSpec splitSpec = origSplitSpec.deepCopy();
- // Zero out partition_id, it's not stable.
- splitSpec.partition_id = 0;
- spec.addToSplit_specs(splitSpec);
- }
- // Reloaded partitions may have a different order. Sort for stability.
- spec.split_specs.sort(null);
- }
- hashThrift(spec);
+ scanNode.incorporateScansIntoTupleCache(this);
}
// The scan ranges have been incorporated into the key and are no longer
needed
// at runtime.
@@ -341,6 +314,17 @@ public class TupleCacheInfo {
hashTraceBuilder_.append(thriftString);
}
+ /**
+ * Hash a regular string and incorporate it into the key
+ */
+ public void hashString(String s) {
+ Preconditions.checkState(!finalized_,
+ "TupleCacheInfo is finalized and can't be modified");
+ Preconditions.checkState(s != null);
+ hasher_.putUnencodedChars(s);
+ hashTraceBuilder_.append(s);
+ }
+
/**
* registerTuple() does two things:
* 1. It incorporates a tuple's layout (and slot information) into the cache
key.
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
index 34221fa6e..a854d7542 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -436,6 +436,18 @@ public class TupleCacheTest extends PlannerTestBase {
verifyJoinNodesEligible(
"select * from
functional_parquet.iceberg_v2_positional_delete_all_rows", 1,
/* isDistributedPlan */ true);
+
+ // When incorporating the scan range information from the build side of the
+ // join, we need to also incorporate information about the partitions
involved.
+ // scale_db.num_partitions_1234_blocks_per_partition_1 is an exotic table
where
+ // all the partitions point to the same file. If we don't incorporate
partition
+ // information, then it can't tell apart queries against different
partitions.
+ String incorporatePartitionSqlTemplate =
+ "select straight_join build.j, probe.id from functional.alltypes
probe, " +
+ "scale_db.num_partitions_1234_blocks_per_partition_1 build " +
+ "where probe.id = build.i and build.j = %s";
+ verifyOverlappingCacheKeys(String.format(incorporatePartitionSqlTemplate,
1),
+ String.format(incorporatePartitionSqlTemplate, 2));
}
@Test
diff --git a/tests/common/impala_test_suite.py
b/tests/common/impala_test_suite.py
index 3e6ce0c8a..9e7414716 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1320,12 +1320,17 @@ class ImpalaTestSuite(BaseTestSuite):
assert abs(a - b) / float(max(abs(a), abs(b))) <= diff_perc
def _get_table_location(self, table_name, vector):
- """ Returns the HDFS location of the table.
- This method changes self.client to point to the dabatase described by
'vector'."""
- db_name = self.get_db_name_from_format(vector.get_table_format())
- self.__change_client_database(self.client, db_name=db_name)
+ """ Returns the HDFS location of the table. If the table is not fully
qualified,
+ this uses the database from the vector."""
+ is_fully_qualified = table_name.find(".") != -1
+ if is_fully_qualified:
+ fq_table_name = table_name
+ else:
+ db_name = self.get_db_name_from_format(vector.get_table_format())
+ fq_table_name = "{0}.{1}".format(db_name, table_name)
+
result = self.execute_query_using_client(self.client,
- "describe formatted %s" % table_name, vector)
+ "describe formatted %s" % fq_table_name, vector)
for row in result.data:
if 'Location:' in row:
return row.split('\t')[1]
diff --git a/tests/custom_cluster/test_tuple_cache.py
b/tests/custom_cluster/test_tuple_cache.py
index 4b5b13724..4df907b97 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -404,6 +404,42 @@ class TestTupleCacheSingle(TestTupleCacheBase):
self.run_test_case('QueryTest/parquet-resolution-by-name', vector,
use_db=unique_database)
+ def test_partition_information(self, vector):
+ """Verify that partition information is incorporated into the runtime
cache key"""
+ self.client.set_configuration(vector.get_value('exec_option'))
+
+ # scale_db.num_partitions_1234_blocks_per_partition_1 is an exotic table
where all
+ # the partitions point to the same filesystem location. A single file is
read many
+ # times for different partitions. It is not possible to tell the
partitions apart
+ # by the file path, so this verifies that the partition information is
being included
+ # properly.
+ query_template = \
+ "select i, j from scale_db.num_partitions_1234_blocks_per_partition_1
where j={0}"
+ # Run against the j=1 partition
+ result1 = self.execute_query(query_template.format(1))
+ assert result1.success
+ assertCounters(result1.runtime_profile, 0, 0, 0)
+ assert len(result1.data) == 1
+ assert result1.data[0].split("\t") == ["1", "1"]
+
+ # Run against the j=2 partition. There should not be a cache hit, because
they are
+ # running against different partitions. This only works if the runtime key
+ # incorporates the partition information.
+ result2 = self.execute_query(query_template.format(2))
+ assert result2.success
+ assertCounters(result2.runtime_profile, 0, 0, 0)
+ assert len(result2.data) == 1
+ assert result2.data[0].split("\t") == ["1", "2"]
+
+ def test_json_binary_format(self, vector, unique_database):
+ """This is identical to test_scanners.py's
TestBinaryType::test_json_binary_format.
+ That test modifies a table's serde properties to change the json binary
format.
+ The tuple cache detects that by including the partition's storage
descriptor
+ information. This fails if that doesn't happen."""
+ test_tbl = unique_database + '.binary_tbl'
+ self.clone_table('functional_json.binary_tbl', test_tbl, False, vector)
+ self.run_test_case('QueryTest/json-binary-format', vector, unique_database)
+
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
class TestTupleCacheCluster(TestTupleCacheBase):
diff --git a/tests/query_test/test_scanners_fuzz.py
b/tests/query_test/test_scanners_fuzz.py
index 8b10a4d5b..5c0f0d433 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -108,18 +108,22 @@ class TestScannersFuzzing(ImpalaTestSuite):
src_db = QueryTestSectionReader.get_db_name(table_format)
if table_format.file_format not in ['parquet', 'orc']: pytest.skip()
+ # For historical reasons, this test ran against the wrong database and was
+ # running against the uncorrupted table. Now that this is corrected, it
frequently
+ # crashes Impala. Until that is fixed in IMPALA-14219, we need to xfail
this test.
+ pytest.xfail("IMPALA-14219: this test can crash Impala")
# Additional queries to scan the nested values.
custom_queries = [
"select count(*) from ("
" select distinct t.id, a.pos as apos, a.item as aitem, aa.pos,
aa.item, "
" m.key as mkey, m.value as mvalue, ma.key, ma.value,
t.nested_struct.* "
- " from complextypestbl t, t.int_array a, t.int_array_array.item aa, "
+ " from {db}.{table} t, t.int_array a, t.int_array_array.item aa, "
" t.int_map m, t.int_map_array.item ma) q",
"select count(*) from ("
" select t.id, t.nested_struct.a, b.pos as bpos, b.item as bitem, i.e,
i.f, m.key,"
" arr.pos, arr.item "
- " from complextypestbl t, t.nested_struct.b, t.nested_struct.c.d.item
i,"
+ " from {db}.{table} t, t.nested_struct.b, t.nested_struct.c.d.item i,"
" t.nested_struct.g m, m.value.h.i arr) q",
]
self.run_fuzz_test(vector, src_db, table_name, unique_database,
table_name, 10,
@@ -158,12 +162,16 @@ class TestScannersFuzzing(ImpalaTestSuite):
def test_fuzz_parquet_v2(self, vector, unique_database):
table_format = vector.get_value('table_format')
if table_format.file_format != 'parquet': pytest.skip()
+ # For historical reasons, this test ran against the wrong database and was
+ # running against the uncorrupted table. Now that this is corrected, it
frequently
+ # crashes Impala. Until that is fixed in IMPALA-14219, we need to xfail
this test.
+ pytest.xfail("IMPALA-14219: this test can crash Impala")
tables = ["alltypesagg_parquet_v2_uncompressed",
"alltypesagg_parquet_v2_snappy"]
for table_name in tables:
custom_queries = [
"select avg(float_col), avg(double_col), avg(timestamp_col)"
- " from %s where bool_col;" % table_name
+ " from {db}.{table} where bool_col;"
]
self.run_fuzz_test(vector, "functional_parquet", table_name,
unique_database,
table_name, 10, custom_queries)
@@ -172,7 +180,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
"complextypestbl_parquet_v2_snappy"]
for table_name in tables:
custom_queries = [
- "select int_array from %s;" % table_name
+ "select int_array from {db}.{table};"
]
self.run_fuzz_test(vector, "functional_parquet", table_name,
unique_database,
table_name, 10, custom_queries)
@@ -188,6 +196,8 @@ class TestScannersFuzzing(ImpalaTestSuite):
SCANNER_FUZZ_SEED can be set in the environment to reproduce the result
(assuming that
input files are the same).
SCANNER_FUZZ_KEEP_FILES can be set in the environment to keep the
generated files.
+ custom_queries can specify additional queries to run. References to '{db}'
and
+ '{table}' in the custom queries are replaced with the fuzz db and table.
"""
# Create and seed a new random number generator for reproducibility.
rng = random.Random()
@@ -246,11 +256,11 @@ class TestScannersFuzzing(ImpalaTestSuite):
# Also execute a count(*) that materializes no columns, since different
code
# paths are exercised.
queries = [
- 'select count(*) from (select distinct * from {0}.{1}) q'.format(
- fuzz_db, fuzz_table),
- 'select count(*) from {0}.{1} q'.format(fuzz_db, fuzz_table)]
+ 'select count(*) from (select distinct * from {db}.{table}) q'.format(
+ db=fuzz_db, table=fuzz_table),
+ 'select count(*) from {db}.{table} q'.format(db=fuzz_db,
table=fuzz_table)]
if custom_queries is not None:
- queries = queries + [s.format(fuzz_db, fuzz_table) for s in
custom_queries]
+ queries = queries + [s.format(db=fuzz_db, table=fuzz_table) for s in
custom_queries]
for query, batch_size, disable_codegen in \
itertools.product(queries, self.BATCH_SIZES,
self.DISABLE_CODEGEN_VALUES):