This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-3.4.2
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 82103101826309138d22864d04137da2df15f0c3
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Thu Sep 24 15:28:46 2020 +0200

    IMPALA-9952: Fix page index filtering for empty pages
    
    As IMPALA-4371 and IMPALA-10186 points out, Impala might write
    empty data pages. It usually does that when it has to write a bigger
    page than the current page size. If we really need to write empty data
    pages is a different question, but we need to handle them correctly
    as there are already such files out there.
    
    The corresponding Parquet offset index entries to empty data pages
    are invalid PageLocation objects with 'compressed_page_size' = 0.
    Before this commit Impala didn't ignore the empty page locations, but
    generated a warning. Since invalid page index doesn't fail a scan
    by default, Impala continued scanning the file with semi-initialized
    page filtering. This resulted in 'Top level rows aren't in sync'
    error, or a crash in DEBUG builds.
    
    With this commit Impala ignores empty data pages and still able to
    filter the rest of the pages. Also, if the page index is corrupt
    for some other reason, Impala correctly resets the page filtering
    logic and falls back to regular scanning.
    
    Testing:
    * Added unit test for empty data pages
    * Added e2e test for empty data pages
    * Added e2e test for invalid page index
    
    Change-Id: I4db493fc7c383ed5ef492da29c9b15eeb3d17bb0
    Reviewed-on: http://gerrit.cloudera.org:8080/16503
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  38 ++++++----
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  17 +++--
 be/src/exec/parquet/parquet-common-test.cc         |  10 ++-
 be/src/exec/parquet/parquet-common.cc              |  46 +++++++++---
 testdata/data/README                               |  17 +++++
 testdata/data/alltypes_empty_pages.parquet         | Bin 0 -> 22929 bytes
 testdata/data/alltypes_invalid_pages.parquet       | Bin 0 -> 25288 bytes
 .../queries/QueryTest/parquet-page-index.test      |  83 +++++++++++++++++++++
 tests/query_test/test_parquet_stats.py             |  24 +++---
 9 files changed, 195 insertions(+), 40 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc 
b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 7b3f4d197..dbbe6c583 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -622,12 +622,11 @@ Status HdfsParquetScanner::NextRowGroup() {
     // Evaluate page index.
     if (!min_max_conjunct_evals_.empty() &&
         state_->query_options().parquet_read_page_index) {
-      bool filter_pages;
-      Status page_index_status = ProcessPageIndex(&filter_pages);
+      Status page_index_status = ProcessPageIndex();
       if (!page_index_status.ok()) {
         RETURN_IF_ERROR(state_->LogOrReturnError(page_index_status.msg()));
       }
-      if (filter_pages && candidate_ranges_.empty()) {
+      if (filter_pages_ && candidate_ranges_.empty()) {
         // Page level statistics filtered the whole row group. It can happen 
when there
         // is a gap in the data between the pages and the user's predicate hit 
that gap.
         // E.g. column chunk 'A' has two pages with statistics {min: 0, max: 
5},
@@ -704,24 +703,28 @@ bool HdfsParquetScanner::ReadStatFromIndex(const 
ColumnStatsReader& stats_reader
   return false;
 }
 
-Status HdfsParquetScanner::ProcessPageIndex(bool* filter_pages) {
-  MonotonicStopWatch single_process_page_index_timer;
-  single_process_page_index_timer.Start();
+void HdfsParquetScanner::ResetPageFiltering() {
+  filter_pages_ = false;
   candidate_ranges_.clear();
-  *filter_pages = false;
   for (auto& scalar_reader : scalar_readers_) 
scalar_reader->ResetPageFiltering();
+}
+
+Status HdfsParquetScanner::ProcessPageIndex() {
+  MonotonicStopWatch single_process_page_index_timer;
+  single_process_page_index_timer.Start();
+  ResetPageFiltering();
   RETURN_IF_ERROR(page_index_.ReadAll(row_group_idx_));
   if (page_index_.IsEmpty()) return Status::OK();
   // We can release the raw page index buffer when we exit this function.
   const auto scope_exit = 
MakeScopeExitTrigger([this](){page_index_.Release();});
-  RETURN_IF_ERROR(EvaluatePageIndex(filter_pages));
-  RETURN_IF_ERROR(ComputeCandidatePagesForColumns(filter_pages));
+  RETURN_IF_ERROR(EvaluatePageIndex());
+  RETURN_IF_ERROR(ComputeCandidatePagesForColumns());
   single_process_page_index_timer.Stop();
   
process_page_index_stats_->UpdateCounter(single_process_page_index_timer.ElapsedTime());
   return Status::OK();
 }
 
-Status HdfsParquetScanner::EvaluatePageIndex(bool* filter_pages) {
+Status HdfsParquetScanner::EvaluatePageIndex() {
   parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
   vector<RowRange> skip_ranges;
 
@@ -792,13 +795,16 @@ Status HdfsParquetScanner::EvaluatePageIndex(bool* 
filter_pages) {
     }
   }
   if (!ComputeCandidateRanges(row_group.num_rows, &skip_ranges, 
&candidate_ranges_)) {
-    return Status(Substitute("Invalid offset index in Parquet file $0.", 
filename()));
+    ResetPageFiltering();
+    return Status(Substitute(
+        "Invalid offset index in Parquet file $0. Page index filtering is 
disabled.",
+        filename()));
   }
-  *filter_pages = true;
+  filter_pages_ = true;
   return Status::OK();
 }
 
-Status HdfsParquetScanner::ComputeCandidatePagesForColumns(bool* filter_pages) 
{
+Status HdfsParquetScanner::ComputeCandidatePagesForColumns() {
   if (candidate_ranges_.empty()) return Status::OK();
 
   parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
@@ -806,8 +812,10 @@ Status 
HdfsParquetScanner::ComputeCandidatePagesForColumns(bool* filter_pages) {
     const auto& page_locations = scalar_reader->offset_index_.page_locations;
     if (!ComputeCandidatePages(page_locations, candidate_ranges_, 
row_group.num_rows,
         &scalar_reader->candidate_data_pages_)) {
-      *filter_pages = false;
-      return Status(Substitute("Invalid offset index in Parquet file $0.", 
filename()));
+      ResetPageFiltering();
+      return Status(Substitute(
+          "Invalid offset index in Parquet file $0. Page index filtering is 
disabled.",
+          filename()));
     }
   }
   for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h 
b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 9ed3cd2c7..3d99afdae 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -413,6 +413,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// pages in a column chunk.
   boost::scoped_ptr<MemPool> dictionary_pool_;
 
+  /// True, if we filter pages based on the Parquet page index.
+  bool filter_pages_ = false;
+
   /// Contains the leftover ranges after evaluating the page index.
   /// If all rows were eliminated, then the row group is skipped immediately 
after
   /// evaluating the page index.
@@ -515,18 +518,22 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// to be OK as well.
   Status NextRowGroup() WARN_UNUSED_RESULT;
 
+  /// Resets page index filtering state, i.e. clears 'candidate_ranges_' and 
resets
+  /// scalar readers' page filtering as well.
+  void ResetPageFiltering();
+
   /// High-level function for initializing page filtering for the scalar 
readers.
-  /// Sets 'filter_pages' to true if found any page to filter out.
-  Status ProcessPageIndex(bool* filter_pages);
+  /// Sets 'filter_pages_' to true if found any page to filter out.
+  Status ProcessPageIndex();
 
   /// Evaluates 'min_max_conjunct_evals_' against the column index and 
determines the row
   /// ranges that might contain data we are looking for.
-  /// Sets 'filter_pages' to true if found any page to filter out.
-  Status EvaluatePageIndex(bool* filter_pages);
+  /// Sets 'filter_pages_' to true if found any page to filter out.
+  Status EvaluatePageIndex();
 
   /// Based on 'candidate_ranges_' it determines the candidate pages for each
   /// scalar reader.
-  Status ComputeCandidatePagesForColumns(bool* filter_pages);
+  Status ComputeCandidatePagesForColumns();
 
   /// Check that the scalar readers agree on the top-level row being scanned.
   Status CheckPageFiltering();
diff --git a/be/src/exec/parquet/parquet-common-test.cc 
b/be/src/exec/parquet/parquet-common-test.cc
index b67603a3f..8a266ebc6 100644
--- a/be/src/exec/parquet/parquet-common-test.cc
+++ b/be/src/exec/parquet/parquet-common-test.cc
@@ -70,6 +70,10 @@ void ValidatePages(const vector<int64_t>& first_row_indexes, 
const RangeVec& ran
   for (int64_t first_row_index : first_row_indexes) {
     parquet::PageLocation page_loc;
     page_loc.first_row_index = first_row_index;
+    if (first_row_index != -1) {
+      // first_row_index == -1 means empty page.
+      page_loc.compressed_page_size = 42;
+    }
     page_locations.push_back(page_loc);
   }
   vector<int> candidate_pages;
@@ -106,9 +110,13 @@ TEST(ParquetCommon, ComputeCandidatePages) {
       {{0LL, 9LL + INT_MAX}}, 100LL + INT_MAX, {0});
   ValidatePages({0LL, 10LL + UINT_MAX, 20LL + UINT_MAX, 50LL + UINT_MAX, 70LL 
+ UINT_MAX},
         {{0LL, 9LL + UINT_MAX}}, 100LL + UINT_MAX, {0});
+  // Empty pages are ignored
+  ValidatePages({0, -1}, {{0, 10}}, 15, {0});
+  ValidatePages({-1, 0}, {{0, 10}}, 15, {1});
+  ValidatePages({-1, 0, -1, -1, 10, -1}, {{0, 10}}, 15, {1, 4});
   // Error cases:
   // Negative first row index.
-  ValidatePagesError({-1, 0, 10}, {{0, 10}}, 10, {0});
+  ValidatePagesError({-2, 0, 10}, {{0, 10}}, 15, {0});
   // First row index greater then number of rows.
   ValidatePagesError({5, 10, 15}, {{0, 10}}, 10, {0});
   // First row indexes are not in order.
diff --git a/be/src/exec/parquet/parquet-common.cc 
b/be/src/exec/parquet/parquet-common.cc
index 285ef0d63..c5db93591 100644
--- a/be/src/exec/parquet/parquet-common.cc
+++ b/be/src/exec/parquet/parquet-common.cc
@@ -118,17 +118,32 @@ bool ComputeCandidateRanges(const int64_t num_rows, 
vector<RowRange>* skip_range
   return true;
 }
 
+inline bool IsValidPageLocation(const parquet::PageLocation& page_loc,
+    const int64_t num_rows) {
+  return page_loc.offset >= 0 &&
+         page_loc.first_row_index >= 0 &&
+         page_loc.first_row_index < num_rows;
+}
+
 static bool ValidatePageLocations(const vector<parquet::PageLocation>& 
page_locations,
     const int64_t num_rows) {
+  int last_valid_idx = -1;
   for (int i = 0; i < page_locations.size(); ++i) {
     auto& page_loc = page_locations[i];
-    if (page_loc.first_row_index < 0 || page_loc.first_row_index >= num_rows) {
-      return false;
+    if (!IsValidPageLocation(page_loc, num_rows)) {
+      // Skip page locations for empty pages.
+      if (page_loc.compressed_page_size == 0) {
+        continue;
+      } else {
+        return false;
+      }
     }
-    if (i + 1 < page_locations.size()) {
-      auto& next_page_loc = page_locations[i+1];
-      if (page_loc.first_row_index >= next_page_loc.first_row_index) return 
false;
+    if (last_valid_idx != -1) {
+      auto& last_valid_page = page_locations[last_valid_idx];
+      // 'first_row_index' must have progressed in a non-empty page.
+      if (page_loc.first_row_index <= last_valid_page.first_row_index) return 
false;
     }
+    last_valid_idx = i;
   }
   return true;
 }
@@ -147,11 +162,21 @@ bool ComputeCandidatePages(
   if (!ValidatePageLocations(page_locations, num_rows)) return false;
 
   int range_idx = 0;
-  for (int i = 0; i < page_locations.size(); ++i) {
-    auto& page_location = page_locations[i];
+  int page_idx = 0;
+  while (page_idx < page_locations.size()) {
+    auto& page_location = page_locations[page_idx];
+    if (page_location.compressed_page_size == 0) {
+      ++page_idx;
+      continue;
+    }
+    int next_page_idx = page_idx + 1;
+    while (next_page_idx < page_locations.size() &&
+           page_locations[next_page_idx].compressed_page_size == 0) {
+      ++next_page_idx;
+    }
     int64_t page_start = page_location.first_row_index;
-    int64_t page_end = i != page_locations.size() - 1 ?
-                       page_locations[i + 1].first_row_index - 1 :
+    int64_t page_end = next_page_idx < page_locations.size() ?
+                       page_locations[next_page_idx].first_row_index - 1 :
                        num_rows - 1;
     while (range_idx < candidate_ranges.size() &&
         candidate_ranges[range_idx].last < page_start) {
@@ -159,8 +184,9 @@ bool ComputeCandidatePages(
     }
     if (range_idx >= candidate_ranges.size()) break;
     if (RangesIntersect(candidate_ranges[range_idx], {page_start, page_end})) {
-      candidate_pages->push_back(i);
+      candidate_pages->push_back(page_idx);
     }
+    page_idx = next_page_idx;
   }
   // When there are candidate ranges, then we should have at least one 
candidate page.
   if (!candidate_ranges.empty() && candidate_pages->empty()) return false;
diff --git a/testdata/data/README b/testdata/data/README
index 7dca8b0db..ae139c0d8 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -494,3 +494,20 @@ the same indexing hash but a newer version depends on the 
time of writing.
 `ca51fa17-681b-4497-85b7-4f68e7a63ee7-0_1-38-282_20200112194529.parquet`
 If the impala table was refreshed after this file was written, impala will
 only query on the file with latest version.
+
+alltypes_empty_pages.parquet
+Parquet file that contians empty data pages. Needed to test IMPALA-9952.
+Generated by a modified Impala (git hash e038db44 (between 3.4 and 4.0)). I 
modified
+HdfsParquetTableWriter::ShouldStartNewPage() to randomly start a new page:
+int64_t r = random(); if (r % 2 + r % 3 + r % 5 == 0) return true;
+Also modified HdfsParquetTableWriter::NewPage() to randomly insert empty pages:
+if (r ... ) pages_.push_back(DataPage());
+
+alltypes_invalid_pages.parquet
+Parquet file that contains invalid data pages Needed to test IMPALA-9952.
+Generated by a modified Impala (git hash e038db44 (between 3.4 and 4.0)). I 
modified
+HdfsParquetTableWriter::ShouldStartNewPage() to randomly start a new page:
+int64_t r = random(); if (r % 2 + r % 3 + r % 5 == 0) return true;
+Also modified HdfsParquetTableWriter::BaseColumnWriter::Flush to randomly 
invalidate
+the offset index:
+if (r ... ) location.offset = -1;
diff --git a/testdata/data/alltypes_empty_pages.parquet 
b/testdata/data/alltypes_empty_pages.parquet
new file mode 100644
index 000000000..289ac1b4d
Binary files /dev/null and b/testdata/data/alltypes_empty_pages.parquet differ
diff --git a/testdata/data/alltypes_invalid_pages.parquet 
b/testdata/data/alltypes_invalid_pages.parquet
new file mode 100644
index 000000000..8fa8e649d
Binary files /dev/null and b/testdata/data/alltypes_invalid_pages.parquet differ
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test
index 47c4eaa1e..7c44e214c 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test
@@ -217,3 +217,86 @@ DECIMAL, DECIMAL
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumStatsFilteredRowGroups): 1
 ====
+---- QUERY
+# Query table with empty data pages
+select * from alltypes_empty_pages where id = 109
+---- RESULTS
+109,false,9,9,9,90,9.899999618530273,90.89999999999999,'01/11/09','9',2009-01-11
 01:49:04.860000000,2009,1
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, 
TIMESTAMP, INT, INT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumStatsFilteredPages): 186
+====
+---- QUERY
+# Query table with empty data pages
+select * from alltypes_empty_pages where id = 51
+---- RESULTS
+51,false,1,1,1,10,1.100000023841858,10.1,'01/06/09','1',2009-01-06 
00:51:02.250000000,2009,1
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, 
TIMESTAMP, INT, INT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumStatsFilteredPages): 186
+====
+---- QUERY
+# Query table with empty data pages
+select * from alltypes_empty_pages where id = 491
+---- RESULTS
+491,false,1,1,1,10,1.100000023841858,10.1,'02/19/09','1',2009-02-19 
03:01:08.100000000,2009,2
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, 
TIMESTAMP, INT, INT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumStatsFilteredPages): 203
+====
+---- QUERY
+# Query table with invalid offset index.
+set abort_on_error=1;
+select * from alltypes_invalid_pages where id = 109
+---- CATCH
+Invalid offset index in Parquet file
+====
+---- QUERY
+# Only query columns with valid offset index.
+set abort_on_error=1;
+select id, bool_col, tinyint_col from alltypes_invalid_pages where id > 51 and 
id < 55
+---- RESULTS
+52,true,2
+53,false,3
+54,true,4
+---- TYPES
+INT, BOOLEAN, TINYINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumStatsFilteredPages): 60
+====
+---- QUERY
+# Query single column with invalid offset index.
+set abort_on_error=1;
+select sum(smallint_col) from alltypes_invalid_pages where smallint_col = 9;
+---- CATCH
+Invalid offset index in Parquet file
+====
+---- QUERY
+# Query table with invalid offset index.
+set abort_on_error=0;
+select * from alltypes_invalid_pages where id = 109
+---- ERRORS
+Invalid offset index in Parquet file __HDFS_FILENAME__ Page index filtering is 
disabled.
+---- RESULTS
+109,false,9,9,9,90,9.899999618530273,90.89999999999999,'01/11/09','9',2009-01-11
 01:49:04.860000000,2009,1
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, 
TIMESTAMP, INT, INT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumStatsFilteredPages): 0
+====
+---- QUERY
+# Query single column with invalid offset index.
+set abort_on_error=0;
+select sum(smallint_col) from alltypes_invalid_pages where smallint_col = 9;
+---- ERRORS
+Invalid offset index in Parquet file __HDFS_FILENAME__ Page index filtering is 
disabled.
+---- RESULTS
+450
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumStatsFilteredPages): 0
+====
diff --git a/tests/query_test/test_parquet_stats.py 
b/tests/query_test/test_parquet_stats.py
index a64cb88a7..5087fa204 100644
--- a/tests/query_test/test_parquet_stats.py
+++ b/tests/query_test/test_parquet_stats.py
@@ -18,6 +18,7 @@
 import os
 import pytest
 import shlex
+from copy import deepcopy
 from subprocess import check_call
 
 from tests.common.file_utils import (
@@ -78,22 +79,27 @@ class TestParquetStats(ImpalaTestSuite):
     """Test that using the Parquet page index works well. The various test 
files
     contain queries that exercise the page selection and value-skipping logic 
against
     columns with different types and encodings."""
+    new_vector = deepcopy(vector)
+    del new_vector.get_value('exec_option')['abort_on_error']
     create_table_from_parquet(self.client, unique_database, 'decimals_1_10')
     create_table_from_parquet(self.client, unique_database, 'nested_decimals')
     create_table_from_parquet(self.client, unique_database, 
'double_nested_decimals')
     create_table_from_parquet(self.client, unique_database, 
'alltypes_tiny_pages')
     create_table_from_parquet(self.client, unique_database, 
'alltypes_tiny_pages_plain')
+    create_table_from_parquet(self.client, unique_database, 
'alltypes_empty_pages')
+    create_table_from_parquet(self.client, unique_database, 
'alltypes_invalid_pages')
 
-    for batch_size in [0, 1]:
-      vector.get_value('exec_option')['batch_size'] = batch_size
-      self.run_test_case('QueryTest/parquet-page-index', vector, 
unique_database)
-      self.run_test_case('QueryTest/nested-types-parquet-page-index', vector,
+    for batch_size in [1]:
+      new_vector.get_value('exec_option')['batch_size'] = batch_size
+      self.run_test_case('QueryTest/parquet-page-index', new_vector, 
unique_database)
+      self.run_test_case('QueryTest/nested-types-parquet-page-index', 
new_vector,
                          unique_database)
-      self.run_test_case('QueryTest/parquet-page-index-alltypes-tiny-pages', 
vector,
-                         unique_database)
-      
self.run_test_case('QueryTest/parquet-page-index-alltypes-tiny-pages-plain', 
vector,
+      self.run_test_case('QueryTest/parquet-page-index-alltypes-tiny-pages', 
new_vector,
                          unique_database)
+      
self.run_test_case('QueryTest/parquet-page-index-alltypes-tiny-pages-plain',
+                         new_vector, unique_database)
 
     for batch_size in [0, 32]:
-      vector.get_value('exec_option')['batch_size'] = batch_size
-      self.run_test_case('QueryTest/parquet-page-index-large', vector, 
unique_database)
+      new_vector.get_value('exec_option')['batch_size'] = batch_size
+      self.run_test_case('QueryTest/parquet-page-index-large', new_vector,
+                         unique_database)

Reply via email to