This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 607bad042a09b7c2635a50b2eb03182e255326ff
Author: Xuebin Su <[email protected]>
AuthorDate: Fri Mar 21 11:29:08 2025 +0800

    IMPALA-3841: Enable late materialization for collections
    
    This patch enables late materialization for collections to avoid the
    cost of materializing collections that will never be accessed by the
    query.
    
    For a collection column, late materialization takes effect only when the
    collection column is not used in any predicate, including the `!empty()`
    predicate added by the planner. Otherwise we need to read every row to
    evaluate the predicate and cannot skip any. Therefore, this patch skips
    registering the `!empty()` predicates if the query contains zipping
    unnests. This can affect performance if the table contains many empty
    collections, but should be noticeable only in very extreme cases.
    
    The late materialization threshold is set to 1 in HdfsParquetScanner
    when there is any collection that can be skipped.
    
    This patch also adds the detail of `HdfsScanner::parse_status_` to the
    error message returned by the HdfsParquetScanner to help figure out the
    root cause.
    
    Performance:
    - Tests with the queries involving collection columns in table
      `tpch_nested_parquet.customer` show that when the selectivity is low,
      the single-threaded (1 impalad and MT_DOP=1) scanning time can be
      reduced by about 50%, while when the selectivity is high, the scanning
      time almost does not change.
    - For queries not involving collections, performance A/B testing
      shows no regression on TPC-H.
    
    Testing:
    - Added a runtime profile counter NumTopLevelValuesSkipped to record
      the total number of top-level values skipped for all columns. The
      counter only counts the values that are not skipped as a page.
    - Added e2e test cases in test_parquet_late_materialization.py to ensure
      that late materialization works using the new counter.
    
    Change-Id: Ia21bdfa6811408d66d74367e0a9520e20951105f
    Reviewed-on: http://gerrit.cloudera.org:8080/22662
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/parquet/hdfs-parquet-scanner.cc        | 36 +++++++++++---------
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  4 +++
 .../parquet/parquet-collection-column-reader.cc    | 14 ++++++--
 be/src/exec/parquet/parquet-column-readers.cc      | 26 +++++++++++++--
 be/src/exec/parquet/parquet-column-readers.h       | 24 ++++++++++++-
 .../exec/parquet/parquet-complex-column-reader.h   | 15 +++++++++
 be/src/exec/parquet/parquet-level-decoder.h        |  4 +++
 .../exec/parquet/parquet-struct-column-reader.cc   |  6 ++++
 be/src/exec/scratch-tuple-batch.h                  |  3 +-
 common/thrift/generate_error_codes.py              |  3 +-
 .../org/apache/impala/analysis/SelectStmt.java     |  5 ++-
 .../queries/PlannerTest/zipping-unnest.test        |  6 ----
 .../parquet-late-materialization-unique-db.test    | 27 +++++++++++++++
 .../QueryTest/parquet-late-materialization.test    | 39 ++++++++++++++++++++--
 .../test_parquet_late_materialization.py           |  3 ++
 15 files changed, 181 insertions(+), 34 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc 
b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 9a8c81a88..de286314f 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -155,6 +155,8 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   num_pages_skipped_by_late_materialization_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), 
"NumPagesSkippedByLateMaterialization",
           TUnit::UNIT);
+  num_top_level_values_skipped_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumTopLevelValuesSkipped", 
TUnit::UNIT);
   num_dict_filtered_row_groups_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", 
TUnit::UNIT);
   parquet_compressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER(
@@ -257,15 +259,18 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   }
   DivideFilterAndNonFilterColumnReaders(column_readers_, &filter_readers_,
       &non_filter_readers_);
+  // Set the late materialization threshold to 1 if
+  // - late materialization is enabled, and
+  // - there is any collection that can be skipped.
+  if (late_materialization_threshold_ >= 0
+      && std::find_if(non_filter_readers_.begin(), non_filter_readers_.end(),
+             [](ParquetColumnReader* reader) { return 
reader->IsCollectionReader(); })
+          != non_filter_readers_.end()) {
+    late_materialization_threshold_ = 1;
+  }
   return Status::OK();
 }
 
-// Currently, Collection Readers and scalar readers upon collection values
-// are not supported for late materialization.
-static bool DoesNotSupportLateMaterialization(ParquetColumnReader* 
column_reader) {
-  return column_reader->IsCollectionReader() || column_reader->max_rep_level() 
> 0;
-}
-
 void HdfsParquetScanner::DivideFilterAndNonFilterColumnReaders(
     const vector<ParquetColumnReader*>& column_readers,
     vector<ParquetColumnReader*>* filter_readers,
@@ -274,9 +279,10 @@ void 
HdfsParquetScanner::DivideFilterAndNonFilterColumnReaders(
   non_filter_readers->clear();
   for (auto column_reader : column_readers) {
     auto slot_desc = column_reader->slot_desc();
-    if (DoesNotSupportLateMaterialization(column_reader) || (slot_desc != 
nullptr &&
-        std::find(conjunct_slot_ids_.begin(), conjunct_slot_ids_.end(), 
slot_desc->id())
-            != conjunct_slot_ids_.end())) {
+    if (slot_desc != nullptr
+        && std::find(
+               conjunct_slot_ids_.begin(), conjunct_slot_ids_.end(), 
slot_desc->id())
+            != conjunct_slot_ids_.end()) {
       filter_readers->push_back(column_reader);
     } else {
       non_filter_readers->push_back(column_reader);
@@ -2513,10 +2519,9 @@ Status HdfsParquetScanner::SkipRowsForColumns(
       // among columns.
       if (UNLIKELY(!col_reader->SkipRows(*num_rows_to_skip, *skip_to_row))) {
         return Status(Substitute(
-            "Parquet file might be corrupted: Error in skipping $0 values to 
row $1 "
-            "in column $2 of file $3.",
+            "Error in skipping $0 values to row $1 in column $2 of file $3. 
Detail: $4",
             *num_rows_to_skip, *skip_to_row, col_reader->schema_element().name,
-            filename()));
+            filename(), parse_status_.GetDetail()));
       }
     }
     *num_rows_to_skip = 0;
@@ -2544,13 +2549,14 @@ Status HdfsParquetScanner::FillScratchMicroBatches(
         if (micro_batches[0].start > 0) {
           if (UNLIKELY(!col_reader->SkipRows(micro_batches[0].start, -1))) {
             return Status(ErrorMsg(TErrorCode::PARQUET_ROWS_SKIPPING,
-                col_reader->schema_element().name, filename()));
+                col_reader->schema_element().name, filename(),
+                parse_status_.GetDetail()));
           }
         }
       } else {
         if (UNLIKELY(!col_reader->SkipRows(micro_batches[r].start - last - 1, 
-1))) {
           return Status(ErrorMsg(TErrorCode::PARQUET_ROWS_SKIPPING,
-              col_reader->schema_element().name, filename()));
+              col_reader->schema_element().name, filename(), 
parse_status_.GetDetail()));
         }
       }
       // Ensure that the length of the micro_batch is less than
@@ -2587,7 +2593,7 @@ Status HdfsParquetScanner::FillScratchMicroBatches(
     if (UNLIKELY(last < max_num_tuples - 1)) {
       if (UNLIKELY(!col_reader->SkipRows(max_num_tuples - 1 - last, -1))) {
         return Status(ErrorMsg(TErrorCode::PARQUET_ROWS_SKIPPING,
-            col_reader->schema_element().name, filename()));
+            col_reader->schema_element().name, filename(), 
parse_status_.GetDetail()));
       }
     }
   }
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h 
b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 538f972d3..dcc0c6309 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -539,6 +539,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// rows that survived filtering.
   RuntimeProfile::Counter* num_pages_skipped_by_late_materialization_counter_;
 
+  /// Sum of the numbers of top-level values skipped for all columns.
+  /// It only counts the values that are not skipped as a page.
+  RuntimeProfile::Counter* num_top_level_values_skipped_counter_;
+
   /// Number of row groups skipped due to dictionary filter. This is an 
aggregated counter
   /// that includes the number of filtered row groups as a result of 
evaluating conjuncts
   /// and runtime bloom filters on the dictionary entries.
diff --git a/be/src/exec/parquet/parquet-collection-column-reader.cc 
b/be/src/exec/parquet/parquet-collection-column-reader.cc
index afe46dece..998dbcc19 100644
--- a/be/src/exec/parquet/parquet-collection-column-reader.cc
+++ b/be/src/exec/parquet/parquet-collection-column-reader.cc
@@ -25,11 +25,16 @@ bool CollectionColumnReader::NextLevels() {
   DCHECK(!children_.empty());
   DCHECK_LE(rep_level_, new_collection_rep_level());
   for (int c = 0; c < children_.size(); ++c) {
+    if (children_[c]->IsComplexReader()
+        && 
static_cast<ComplexColumnReader*>(children_[c])->next_levels_consumed()) {
+      continue;
+    }
     do {
       // TODO: verify somewhere that all column readers are at end
       if (!children_[c]->NextLevels()) return false;
     } while (children_[c]->rep_level() > new_collection_rep_level());
   }
+  next_levels_consumed_ = true;
   UpdateDerivedState();
   return true;
 }
@@ -41,10 +46,12 @@ bool CollectionColumnReader::ReadValue(MemPool* pool, 
Tuple* tuple) {
       << "Caller should have called NextLevels() until we are ready to read a 
value";
 
   if (tuple_offset_ == -1) {
+    SetDescendantsNextLevelsConsumed(false);
     return CollectionColumnReader::NextLevels();
   } else if (def_level_ >= max_def_level()) {
     return ReadSlot(tuple->GetCollectionSlot(tuple_offset_), pool);
   } else {
+    SetDescendantsNextLevelsConsumed(false);
     // Collections add an extra def level, so it is possible to distinguish 
between
     // NULL and empty collections. See hdfs-parquet-scanner.h for more detailed
     // explanation.
@@ -74,6 +81,7 @@ bool CollectionColumnReader::ReadValueBatch(MemPool* pool, 
int max_values,
   while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
     Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
     if (def_level_ < def_level_of_immediate_repeated_ancestor()) {
+      SetDescendantsNextLevelsConsumed(false);
       // A containing repeated field is empty or NULL
       continue_execution = NextLevels();
       continue;
@@ -164,10 +172,12 @@ void CollectionColumnReader::UpdateDerivedState() {
 
 bool CollectionColumnReader::SkipRows(int64_t num_rows, int64_t skip_row_id) {
   DCHECK(!children_.empty());
+  // Prevent NextLevels() from being called more than once in the recursion 
when
+  // e.g., a child is also a CollectionColumnReader.
+  next_levels_consumed_ = false;
   for (int c = 0; c < children_.size(); ++c) {
     if (!children_[c]->SkipRows(num_rows, skip_row_id)) return false;
   }
-  UpdateDerivedState();
-  return true;
+  return CollectionColumnReader::NextLevels();
 }
 } // namespace impala
diff --git a/be/src/exec/parquet/parquet-column-readers.cc 
b/be/src/exec/parquet/parquet-column-readers.cc
index 07b5e92de..03f548904 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -1077,6 +1077,12 @@ void BaseScalarColumnReader::Close(RowBatch* row_batch) {
   col_chunk_reader_.Close(row_batch == nullptr ? nullptr : 
row_batch->tuple_data_pool());
   DictDecoderBase* dict_decoder = GetDictionaryDecoder();
   if (dict_decoder != nullptr) dict_decoder->Close();
+  if (num_top_level_values_skipped_counter_ > 0) {
+    // This can happen when the reader gets closed before finishing the 
current page.
+    COUNTER_ADD(parent_->num_top_level_values_skipped_counter_,
+        num_top_level_values_skipped_counter_);
+    num_top_level_values_skipped_counter_ = 0;
+  }
 }
 
 Status BaseScalarColumnReader::InitDictionary() {
@@ -1233,7 +1239,6 @@ template <bool ADVANCE_REP_LEVEL>
 bool BaseScalarColumnReader::NextLevels() {
   if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << 
slot_desc()->DebugString();
 
-  levels_readahead_ = true;
   if (UNLIKELY(num_buffered_values_ == 0)) {
     if (!NextPage()) return parent_->parse_status_.ok();
   }
@@ -1279,9 +1284,12 @@ bool BaseScalarColumnReader::NextLevels() {
     ++current_row_;
   }
 
+  levels_readahead_ = true;
   return parent_->parse_status_.ok();
 }
 
+template bool BaseScalarColumnReader::NextLevels<true>();
+
 void BaseScalarColumnReader::ResetPageFiltering() {
   offset_index_.page_locations.clear();
   candidate_data_pages_.clear();
@@ -1307,7 +1315,7 @@ Status BaseScalarColumnReader::StartPageFiltering() {
     int64_t remaining = 0;
     if (!SkipTopLevelRows(skip_rows, &remaining)) {
       return Status(ErrorMsg(TErrorCode::PARQUET_ROWS_SKIPPING,
-          schema_element().name, filename()));
+          schema_element().name, filename(), 
parent_->parse_status_.GetDetail()));
     }
     DCHECK_EQ(remaining, 0);
     DCHECK_EQ(current_row_, range_start - 1);
@@ -1334,8 +1342,11 @@ bool BaseScalarColumnReader::SkipTopLevelRows(int64_t 
num_rows, int64_t* remaini
     current_row_ += rows_skipped;
     num_buffered_values_ -= rows_skipped;
     *remaining = num_rows - rows_skipped;
+    // Increase the counter before returning when we successfully skip the 
rows.
+    num_top_level_values_skipped_counter_ += rows_skipped;
     return SkipEncodedValuesInPage(rows_skipped);
   }
+  int64_t num_rows_to_skip = num_rows;
   int64_t num_values_to_skip = 0;
   if (max_rep_level() == 0) {
     // No nesting, but field is not required.
@@ -1391,6 +1402,9 @@ bool BaseScalarColumnReader::SkipTopLevelRows(int64_t 
num_rows, int64_t* remaini
     }
     *remaining = num_rows;
   }
+  DCHECK_LT(*remaining, num_rows_to_skip);
+  // Increase the counter before returning when we successfully skip the rows.
+  num_top_level_values_skipped_counter_ += (num_rows_to_skip - *remaining);
   return SkipEncodedValuesInPage(num_values_to_skip);
 }
 
@@ -1520,6 +1534,11 @@ Status BaseScalarColumnReader::HandleTooEarlyEos() {
 
 bool BaseScalarColumnReader::NextPage() {
   parent_->assemble_rows_timer_.Stop();
+  if (num_top_level_values_skipped_counter_ > 0) {
+    COUNTER_ADD(parent_->num_top_level_values_skipped_counter_,
+        num_top_level_values_skipped_counter_);
+    num_top_level_values_skipped_counter_ = 0;
+  }
   parent_->parse_status_ = ReadDataPage();
   if (UNLIKELY(!parent_->parse_status_.ok())) return false;
   if (num_buffered_values_ == 0) {
@@ -1644,6 +1663,9 @@ bool BaseScalarColumnReader::SkipRowsInternal(int64_t 
num_rows, int64_t skip_row
       // Keep advancing to next page header if rows to be skipped are more 
than number
       // of values in the page. Note we will just be reading headers and 
skipping
       // pages without decompressing them as we advance.
+      // In this case, the current column is not in any collection. Therefore 
the number
+      // of rows in the page is equal to the number of values.
+      DCHECK_EQ(max_rep_level(), 0);
       while (num_rows > num_buffered_values_) {
         
COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1);
         num_rows -= num_buffered_values_;
diff --git a/be/src/exec/parquet/parquet-column-readers.h 
b/be/src/exec/parquet/parquet-column-readers.h
index a24bdf6d2..46c929280 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -189,7 +189,7 @@ class ParquetColumnReader {
   /// Skips the number of encoded values specified by 'num_rows', without 
materializing or
   /// decoding them across pages. If page filtering is enabled, then it 
directly skips to
   /// row after 'skip_row_id' and ignores 'num_rows'.
-  /// It invokes 'SkipToLevelRows' for all 'children_'.
+  ///
   /// Returns true on success, false otherwise.
   virtual bool SkipRows(int64_t num_rows, int64_t skip_row_id) = 0;
 
@@ -249,6 +249,13 @@ class ParquetColumnReader {
   /// int16_t is large enough to hold the valid levels 0-255 and negative 
sentinel values
   /// ParquetLevel::INVALID_LEVEL and ParquetLevel::ROW_GROUP_END. The maximum 
values are
   /// cached here because they are accessed in inner loops.
+  ///
+  /// See ParquetSchemaResolver::CreateSchemaTree() for how max_def_level_ and
+  /// max_rep_level_ are computed.
+  ///
+  /// Some usages:
+  /// - def_level_ >= max_def_level() means the current value is defined, i.e. 
not NULL.
+  /// - rep_level_ == 0 means the current value is at the beginning of a 
top-level row.
   int16_t rep_level_;
   const int16_t max_rep_level_;
   int16_t def_level_;
@@ -591,6 +598,17 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// is more than the rows left in current row group. It can happen even with 
corrupt
   /// parquet file where number of values might differ from metadata.
   virtual bool SkipRows(int64_t num_rows, int64_t skip_row_id) override {
+    // Undo NextLevels() if it is called before calling this method.
+    if (levels_readahead_) {
+      DCHECK_NE(def_level_, ParquetLevel::INVALID_LEVEL);
+      DCHECK_NE(rep_level_, ParquetLevel::INVALID_LEVEL);
+      rep_levels_.CachePrev();
+      def_levels_.CachePrev();
+      rep_level_ = ParquetLevel::INVALID_LEVEL;
+      def_level_ = ParquetLevel::INVALID_LEVEL;
+      ++num_buffered_values_;
+      levels_readahead_ = false;
+    }
     if (max_rep_level() > 0) {
       return SkipRowsInternal<true>(num_rows, skip_row_id);
     } else {
@@ -693,6 +711,10 @@ class BaseScalarColumnReader : public ParquetColumnReader {
 
   Status LogCorruptNumValuesInMetadataError();
   Status HandleTooEarlyEos();
+
+  /// Non-thread-safe version of
+  /// HdfsParquetScanner::num_top_level_values_skipped_counter_.
+  int64_t num_top_level_values_skipped_counter_ = 0;
 };
 
 // Inline to allow inlining into collection and scalar column reader.
diff --git a/be/src/exec/parquet/parquet-complex-column-reader.h 
b/be/src/exec/parquet/parquet-complex-column-reader.h
index 39c5c1731..34b7f531d 100644
--- a/be/src/exec/parquet/parquet-complex-column-reader.h
+++ b/be/src/exec/parquet/parquet-complex-column-reader.h
@@ -56,6 +56,8 @@ public:
     pos_current_value_ = ParquetLevel::INVALID_POS;
   }
 
+  bool next_levels_consumed() const { return next_levels_consumed_; }
+
 protected:
   ComplexColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
@@ -68,5 +70,18 @@ protected:
   /// complex item tuples, or there is a single child reader that does not 
materialize
   /// any slot and is only used by this reader to read def and rep levels.
   std::vector<ParquetColumnReader*> children_;
+
+  /// True if the next definition level and repetition level have been 
consumed by
+  /// NextLevels(). If true, NextLevels() should not be called on this reader 
again.
+  bool next_levels_consumed_ = false;
+
+  void SetDescendantsNextLevelsConsumed(bool value) {
+    next_levels_consumed_ = value;
+    for (auto child : children_) {
+      if (child->IsComplexReader()) {
+        
static_cast<ComplexColumnReader*>(child)->SetDescendantsNextLevelsConsumed(value);
+      }
+    }
+  }
 };
 } // namespace impala
diff --git a/be/src/exec/parquet/parquet-level-decoder.h 
b/be/src/exec/parquet/parquet-level-decoder.h
index 1200b1373..3af8cd0f9 100644
--- a/be/src/exec/parquet/parquet-level-decoder.h
+++ b/be/src/exec/parquet/parquet-level-decoder.h
@@ -115,6 +115,10 @@ class ParquetLevelDecoder {
   int CacheSize() const { return num_cached_levels_; }
   int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; }
   int CacheCurrIdx() const { return cached_level_idx_; }
+  void CachePrev() {
+    DCHECK_GE(cached_level_idx_, 1);
+    --cached_level_idx_;
+  }
 
  private:
   /// Initializes members associated with the level cache. Allocates memory for
diff --git a/be/src/exec/parquet/parquet-struct-column-reader.cc 
b/be/src/exec/parquet/parquet-struct-column-reader.cc
index 675cb377b..02a3e7ef1 100644
--- a/be/src/exec/parquet/parquet-struct-column-reader.cc
+++ b/be/src/exec/parquet/parquet-struct-column-reader.cc
@@ -23,8 +23,13 @@ bool StructColumnReader::NextLevels() {
   DCHECK(!children_.empty());
   bool result = true;
   for (ParquetColumnReader* child_reader : children_) {
+    if (child_reader->IsComplexReader()
+        && 
static_cast<ComplexColumnReader*>(child_reader)->next_levels_consumed()) {
+      continue;
+    }
     result &= child_reader->NextLevels();
   }
+  next_levels_consumed_ = true;
   def_level_ = children_[0]->def_level();
   rep_level_ = children_[0]->rep_level();
   if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
@@ -46,6 +51,7 @@ bool StructColumnReader::ReadValue(MemPool* pool, Tuple* 
tuple, bool* read_row)
     }
     *read_row = true;
   } else {
+    SetDescendantsNextLevelsConsumed(false);
     if (!HasNullCollectionAncestor<IN_COLLECTION>()) {
       SetNullSlot(tuple);
       *read_row = true;
diff --git a/be/src/exec/scratch-tuple-batch.h 
b/be/src/exec/scratch-tuple-batch.h
index 951792989..3a60922b7 100644
--- a/be/src/exec/scratch-tuple-batch.h
+++ b/be/src/exec/scratch-tuple-batch.h
@@ -25,7 +25,8 @@
 
 namespace impala {
 
-/// Helper struct that represents a micro batch within 'ScratchTupleBatch'.
+/// Helper struct that represents a ['start', 'end'] range of rows that needs 
to be
+/// scanned within a 'ScratchTupleBatch'.
 struct ScratchMicroBatch {
   int start;
   int end;
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index b2c00b702..7d404d7e8 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -477,7 +477,8 @@ error_codes = (
 
   ("JWT_VERIFY_FAILED", 154, "Error verifying JWT Token: $0."),
 
-  ("PARQUET_ROWS_SKIPPING", 155, "Couldn't skip rows in column '$0' in file 
'$1'."),
+  ("PARQUET_ROWS_SKIPPING", 155, "Couldn't skip rows in column '$0' in file 
'$1'. "
+   "Detail: $2"),
 
   ("QUERY_OPTION_PARSE_FAILED", 156, "Failed to parse query option '$0': $1"),
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index b7eccfd9b..f96be1986 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -811,9 +811,8 @@ public class SelectStmt extends QueryStmt {
         // Do not generate a predicate if the parent tuple is outer joined.
         if 
(analyzer_.isOuterJoined(ref.getResolvedPath().getRootDesc().getId()))
           continue;
-        // Don't push down the "is not empty" predicate for zipping unnests if 
there are
-        // multiple zipping unnests in the FROM clause.
-        if (tblRef.isZippingUnnest() && analyzer_.getNumZippingUnnests() > 1) {
+        // Don't push down the "is not empty" predicate for zipping unnests.
+        if (tblRef.isZippingUnnest()) {
           continue;
         }
         IsNotEmptyPredicate isNotEmptyPred =
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/zipping-unnest.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/zipping-unnest.test
index f7af6d409..a9d3b3ef7 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/zipping-unnest.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/zipping-unnest.test
@@ -20,7 +20,6 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.complextypes_arrays]
    HDFS partitions=1/1 files=1 size=1.06KB
-   predicates: !empty(functional_parquet.complextypes_arrays.arr1)
    predicates on arr1: UNNEST(arr1) < 5, arr1.item < 5
    row-size=16B cardinality=1.35K
 ====
@@ -46,7 +45,6 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.complextypes_arrays a]
    HDFS partitions=1/1 files=1 size=1.06KB
-   predicates: !empty(a.arr1)
    predicates on a.arr1: arr1.item < 5
    row-size=16B cardinality=1.35K
 ====
@@ -72,7 +70,6 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.complextypes_arrays]
    HDFS partitions=1/1 files=1 size=1.06KB
-   predicates: !empty(arr1)
     predicates on functional_parquet.complextypes_arrays.arr1: 
UNNEST(functional_parquet.complextypes_arrays.arr1) < 5, 
functional_parquet.complextypes_arrays.arr1.item < 5
    row-size=16B cardinality=1.35K
 ====
@@ -98,7 +95,6 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.complextypes_arrays]
    HDFS partitions=1/1 files=1 size=1.06KB
-   predicates: !empty(arr1)
    predicates on functional_parquet.complextypes_arrays.arr1: 
functional_parquet.complextypes_arrays.arr1.item < 5
    row-size=16B cardinality=1.35K
 ====
@@ -315,7 +311,6 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.complextypes_arrays]
    HDFS partitions=1/1 files=1 size=1.06KB
-   predicates: !empty(functional_parquet.complextypes_arrays.arr1)
    row-size=16B cardinality=1.35K
 ====
 select id, unnest(arr1), row_number() over (order by id, unnest(arr1))
@@ -347,7 +342,6 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.complextypes_arrays]
    HDFS partitions=1/1 files=1 size=1.06KB
-   predicates: !empty(functional_parquet.complextypes_arrays.arr1)
    row-size=16B cardinality=1.35K
 ====
 select id, item1, item2, row_number() over (order by id, item1, item2)
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization-unique-db.test
 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization-unique-db.test
index 8097bb781..42385efdd 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization-unique-db.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization-unique-db.test
@@ -12,4 +12,31 @@ select i, j from late_mat where j = 0;
 0,0
 ---- TYPES
 INT, INT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumTopLevelValuesSkipped): 3
+====
+---- QUERY
+# Test NumTopLevelValuesSkipped counter when there is more than one page in the
+# column.
+# Table `decimals_1_10` contains 60 rows, in which 8 rows are selected by the
+# query and 44 + 8 == 52 rows are skipped.
+set PARQUET_LATE_MATERIALIZATION_THRESHOLD=1;
+set parquet_read_page_index = false;
+select count(d_10) from decimals_1_10 where d_1 = 1;
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, NumPagesSkippedByLateMaterialization): 44
+aggregation(SUM, NumTopLevelValuesSkipped): 8
+====
+---- QUERY
+# Test if PARQUET_LATE_MATERIALIZATION_THRESHOLD is always 1 if there is any
+# collection that can be skipped.
+set parquet_read_page_index = false;
+select count(unnest(arr)) from nested_decimals n where d_38 = 1;
+---- RESULTS
+4
+---- RUNTIME_PROFILE
+aggregation(SUM, NumPagesSkippedByLateMaterialization): 0
+aggregation(SUM, NumTopLevelValuesSkipped): 17
 ====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization.test
 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization.test
index f2d45c481..2e8b39093 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization.test
@@ -38,8 +38,41 @@ row_regex:.* RF00.\[min_max\] -. .\.l_orderkey.*
 aggregation(SUM, NumPagesSkippedByLateMaterialization)> 0
 ====
 ---- QUERY
-# Test that late materialization on nested columns is disabled.
-select * from tpch_nested_parquet.customer where c_mktsegment = 'COMEDY';
+# Test late materialization for query with one zipping unnest.
+select unnest(arr1)
+from functional_parquet.complextypes_arrays
+where id = 2;
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumPagesSkippedByLateMaterialization): 0
+aggregation(SUM, NumTopLevelValuesSkipped): 10
+====
+---- QUERY
+# Test late materialization for query with multiple zipping unnests.
+select unnest(arr1), unnest(arr2)
+from functional_parquet.complextypes_arrays
+where id = 2;
+---- RUNTIME_PROFILE
+aggregation(SUM, NumTopLevelValuesSkipped): 20
+====
+---- QUERY
+# Test if late materialization for collections works with page filtering.
+# In table tpch_nested_parquet.customer, min(c_phone) is '10-100-106-1617'.
+select count(o_orderkey) > 0
+from tpch_nested_parquet.customer c left outer join c.c_orders
+where c_phone < '10-100-106-16170'
+---- RESULTS
+true
+---- RUNTIME_PROFILE
+aggregation(SUM, NumDictFilteredRowGroups): 0
+aggregation(SUM, NumPagesSkippedByLateMaterialization)> 0
+aggregation(SUM, NumTopLevelValuesSkipped)> 0
+====
+---- QUERY
+# Test if PARQUET_LATE_MATERIALIZATION_THRESHOLD is always 1 if there is any
+# collection that can be skipped.
+set parquet_read_page_index = false;
+set expand_complex_types = true;
+select int_array_array
+from functional_parquet.complextypestbl where id % 2 = 0;
+---- RUNTIME_PROFILE
+aggregation(SUM, NumTopLevelValuesSkipped): 4
 ====
diff --git a/tests/query_test/test_parquet_late_materialization.py 
b/tests/query_test/test_parquet_late_materialization.py
index 10acdbf1d..cac2473e4 100644
--- a/tests/query_test/test_parquet_late_materialization.py
+++ b/tests/query_test/test_parquet_late_materialization.py
@@ -17,6 +17,7 @@
 
 from __future__ import absolute_import, division, print_function
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.file_utils import create_table_from_parquet
 
 
 class TestParquetLateMaterialization(ImpalaTestSuite):
@@ -34,5 +35,7 @@ class TestParquetLateMaterialization(ImpalaTestSuite):
     self.run_test_case('QueryTest/parquet-late-materialization', vector)
 
   def test_parquet_late_materialization_unique_db(self, vector, 
unique_database):
+    create_table_from_parquet(self.client, unique_database, 'decimals_1_10')
+    create_table_from_parquet(self.client, unique_database, 'nested_decimals')
     self.run_test_case('QueryTest/parquet-late-materialization-unique-db', 
vector,
         unique_database)

Reply via email to