This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.1.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0b2f6b7f35ab8de31d27631d2b693915848c99a5 Author: Daniel Becker <[email protected]> AuthorDate: Mon Jul 25 16:22:27 2022 +0200 IMPALA-11345: Parquet Bloom filtering failure if column is added to the schema If a new column was added to an existing table with existing data and Parquet Bloom filtering was turned ON, queries having an equality conjunct on the new column failed. This was because the old Parquet data files did not have the new column in their schema and could not find a column for the conjunct. This was treated as an error and the query failed. After this patch this situation is no longer treated as an error and the conjunct is simply disregarded for Bloom filtering in the files that lack the new column. Testing: - added the test TestParquetBloomFilter::test_parquet_bloom_filtering_schema_change in tests/query_test/test_parquet_bloom_filter.py that checks that a query as described above does not fail. Merge conflicts: - hdfs-parquet-scanner.cc removes usage of NeedDataInFile(). Change-Id: Ief3e6b6358d3dff3abe5beeda752033a7e8e16a6 Reviewed-on: http://gerrit.cloudera.org:8080/18779 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Reviewed-on: http://gerrit.cloudera.org:8080/18888 Tested-by: Quanlong Huang <[email protected]> Reviewed-by: Csaba Ringhofer <[email protected]> --- be/src/exec/parquet/hdfs-parquet-scanner.cc | 72 ++++++++++++++++++++++++--- tests/query_test/test_parquet_bloom_filter.py | 21 ++++++++ 2 files changed, 87 insertions(+), 6 deletions(-) diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc index d8058c875..04b1fddaa 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.cc +++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc @@ -1924,14 +1924,40 @@ Status HdfsParquetScanner::ReadToBuffer(uint64_t offset, uint8_t* buffer, uint64 return Status::OK(); } +void LogMissingFields(google::LogSeverity log_level, const std::string& text_before, + const std::string& text_after, const std::unordered_set<std::string>& paths) { + stringstream s; + s << text_before; + s << "["; + size_t i = 0; + for (const std::string& path : paths) { + s << path; + if (i + 1 < paths.size()) { + s << ", "; + } + i++; + } + + s << "]. "; + s << text_after; + VLOG(log_level) << s.str(); +} + // Create a map from column index to EQ conjuncts for Bloom filtering. Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() { // EQ conjuncts are represented as a LE and a GE conjunct with the same // value. This map is used to pair them to form EQ conjuncts. - // The value is a vector because there may be multiple GE or LE conjuncts on a column. + // The value is a set because there may be multiple GE or LE conjuncts on a column. unordered_map<int, std::unordered_set<std::pair<std::string, const Literal*>>> conjunct_halves; + // Slot paths for which no data is found in the file. It is expected for example if it + // is a partition column and unexpected for example if the column was added to the table + // schema after the current file was written and therefore the current file does + // not have the column. + std::unordered_set<std::string> unexpected_missing_fields; + std::unordered_set<std::string> expected_missing_fields; + for (ScalarExprEvaluator* eval : stats_conjunct_evals_) { const ScalarExpr& expr = eval->root(); const string& function_name = expr.function_name(); @@ -1983,13 +2009,24 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() { } if (missing_field) { - if (file_metadata_utils_.IsValuePartitionCol(slot_desc)) continue; - - return Status(Substitute( - "Unable to find SchemaNode for path '$0' in the schema of file '$1'.", - PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()), filename())); + if (!file_metadata_utils_.IsValuePartitionCol(slot_desc)) { + // If a column is added to the schema of an existing table, the schemas of the + // old parquet data files do not contain the new column: see IMPALA-11345. This + // is not an error, we simply disregard this column in Bloom filtering in this + // scanner. + unexpected_missing_fields.emplace( + PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path())); + } else { + // If the data is not expected to be in the file, we disregard the conjuncts for + // the purposes of Bloom filtering. + expected_missing_fields.emplace( + PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path())); + } + continue; } + DCHECK(node != nullptr); + if (!IsParquetBloomFilterSupported(node->element->type, child_slot_ref->type())) { continue; } @@ -2029,6 +2066,29 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() { } } + // Log expected and unexpected missing fields. + if (!unexpected_missing_fields.empty()) { + LogMissingFields(google::WARNING, + Substitute( + "Unable to find SchemaNode for the following paths in the schema of " + "file '$0': ", + filename()), + "This may be because the column may have been added to the table schema after " + "writing this file. Disregarding conjuncts on this path for the purpose of " + "Parquet Bloom filtering in this file.", + unexpected_missing_fields); + } + + if (!expected_missing_fields.empty()) { + LogMissingFields(google::INFO, + Substitute( + "Data for the following paths is not expected to be present in file '$0': ", + filename()), + "Disregarding conjuncts on this path for the purpose of Parquet Bloom filtering " + "in this file.", + expected_missing_fields); + } + return Status::OK(); } diff --git a/tests/query_test/test_parquet_bloom_filter.py b/tests/query_test/test_parquet_bloom_filter.py index eda93999b..11af6d28d 100644 --- a/tests/query_test/test_parquet_bloom_filter.py +++ b/tests/query_test/test_parquet_bloom_filter.py @@ -95,6 +95,27 @@ class TestParquetBloomFilter(ImpalaTestSuite): vector.get_value('exec_option')['parquet_bloom_filtering'] = False self.run_test_case('QueryTest/parquet-bloom-filter-disabled', vector, unique_database) + def test_parquet_bloom_filtering_schema_change(self, vector, unique_database): + """ Regression test for IMPALA-11345. Tests that the query does not fail when a new + column is added to the table schema but the old Parquet files do not contain it and + therefore no column is found for a conjunct while preparing Bloom filtering. """ + vector.get_value('exec_option')['parquet_bloom_filtering'] = True + + tbl_name = 'changed_schema' + + stmts = [ + 'create table {db}.{tbl} (id INT) stored as parquet', + 'insert into {db}.{tbl} values (1),(2),(3)', + 'alter table {db}.{tbl} add columns (name STRING)', + 'insert into {db}.{tbl} values (4, "James")', + 'select * from {db}.{tbl} where name in ("Lily")' + ] + + for stmt in stmts: + self.execute_query_expect_success(self.client, + stmt.format(db=str(unique_database), tbl=tbl_name), + vector.get_value('exec_option')) + def test_write_parquet_bloom_filter(self, vector, unique_database, tmpdir): # Get Bloom filters from the first row group of file PARQUET_TEST_FILE. reference_col_to_bloom_filter = self._get_first_row_group_bloom_filters(
