This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 44c9163b3c [Fix](multi-catalog)Fix partition external table query bug. (#13535) 44c9163b3c is described below commit 44c9163b3c3144701dfa1bc7b91f20fcedb4b510 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Wed Oct 26 12:47:37 2022 +0800 [Fix](multi-catalog)Fix partition external table query bug. (#13535) The index for external table columns from path is incorrect in new scanner. This is a fix for it. e.g. In the next query, nation and city columns are from path ``` mysql> select nation, city, count(*) from parquet_two_part group by nation, city; +--------+------------+----------+ | nation | city | count(*) | +--------+------------+----------+ | cn | beijing | 1199969 | | cn | shanghai | 1199771 | | jp | tokyo | 599715 | | rus | moscow | 600659 | | us | chicago | 1199805 | | us | washington | 1201296 | +--------+------------+----------+ 6 rows in set (0.39 sec) ``` --- .../exec/format/parquet/vparquet_group_reader.cpp | 22 ++++++++++++++++++ .../exec/format/parquet/vparquet_group_reader.h | 4 ++++ be/src/vec/exec/format/parquet/vparquet_reader.cpp | 5 +++-- be/src/vec/exec/scan/vfile_scanner.cpp | 26 +++++++++++++++++++--- .../doris/planner/external/HiveScanProvider.java | 9 +++++--- gensrc/thrift/PlanNodes.thrift | 2 ++ 6 files changed, 60 insertions(+), 8 deletions(-) diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index a46813cbf2..3b50d32ef3 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -30,6 +30,7 @@ RowGroupReader::RowGroupReader(doris::FileReader* file_reader, _read_columns(read_columns), _row_group_id(row_group_id), _row_group_meta(row_group), + _remaining_rows(row_group.num_rows), _ctz(ctz) {} RowGroupReader::~RowGroupReader() { @@ -38,6 +39,10 @@ RowGroupReader::~RowGroupReader() { Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) { + if (_read_columns.size() == 0) { + // Query task that only select columns in path. + return Status::OK(); + } const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20; const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20; size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size()); @@ -62,6 +67,10 @@ Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange> Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* _batch_eof) { + // Process external table query task that select columns are all from path. + if (_read_columns.empty()) { + return _read_empty_batch(batch_size, read_rows, _batch_eof); + } size_t batch_read_rows = 0; bool has_eof = false; int col_idx = 0; @@ -94,6 +103,19 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ return Status::OK(); } +Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof) { + if (batch_size < _remaining_rows) { + *read_rows = batch_size; + _remaining_rows -= batch_size; + *_batch_eof = false; + } else { + *read_rows = _remaining_rows; + _remaining_rows = 0; + *_batch_eof = true; + } + return Status::OK(); +} + ParquetColumnReader::Statistics RowGroupReader::statistics() { ParquetColumnReader::Statistics st; for (auto& reader : _column_readers) { diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index e1b54bb529..0a18514c39 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -35,12 +35,16 @@ public: ParquetColumnReader::Statistics statistics(); +private: + Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof); + private: doris::FileReader* _file_reader; std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers; const std::vector<ParquetReadColumn>& _read_columns; const int32_t _row_group_id; const tparquet::RowGroup& _row_group_meta; + int64_t _remaining_rows; int64_t _read_rows = 0; cctz::time_zone* _ctz; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 7ff57ac05b..dc33f321d7 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -143,6 +143,7 @@ Status ParquetReader::init_reader( _colname_to_value_range = colname_to_value_range; RETURN_IF_ERROR(_init_read_columns()); RETURN_IF_ERROR(_init_row_group_readers()); + return Status::OK(); } @@ -156,12 +157,12 @@ Status ParquetReader::_init_read_columns() { _missing_cols.push_back(file_col_name); } } + // It is legal to get empty include_column_ids in query task. if (include_column_ids.empty()) { - return Status::InternalError("No columns found in parquet file"); + return Status::OK(); } // The same order as physical columns std::sort(include_column_ids.begin(), include_column_ids.end()); - _read_columns.clear(); for (int& parquet_col_id : include_column_ids) { _read_columns.emplace_back(parquet_col_id, _file_metadata->schema().get_column(parquet_col_id)->name); diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index c9ccfbf46c..3503e8c468 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -257,7 +257,6 @@ Status VFileScanner::_fill_columns_from_path(size_t rows) { return Status::InternalError(ss.str()); } const std::string& column_from_path = range.columns_from_path[it->second]; - auto doris_column = _src_block_ptr->get_by_name(slot_desc->col_name()).column; IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); @@ -528,12 +527,28 @@ Status VFileScanner::_init_expr_ctxes() { std::map<SlotId, int> full_src_index_map; std::map<SlotId, SlotDescriptor*> full_src_slot_map; + std::map<std::string, int> partition_name_to_key_index_map; int index = 0; for (const auto& slot_desc : _real_tuple_desc->slots()) { full_src_slot_map.emplace(slot_desc->id(), slot_desc); full_src_index_map.emplace(slot_desc->id(), index++); } + // For external table query, find the index of column in path. + // Because query doesn't always search for all columns in a table + // and the order of selected columns is random. + // All ranges in _ranges vector should have identical columns_from_path_keys + // because they are all file splits for the same external table. + // So here use the first element of _ranges to fill the partition_name_to_key_index_map + if (_ranges[0].__isset.columns_from_path_keys) { + std::vector<std::string> key_map = _ranges[0].columns_from_path_keys; + if (!key_map.empty()) { + for (size_t i = 0; i < key_map.size(); i++) { + partition_name_to_key_index_map.emplace(key_map[i], i); + } + } + } + _num_of_columns_from_file = _params.num_of_columns_from_file; for (const auto& slot_info : _params.required_slots) { auto slot_id = slot_info.slot_id; @@ -551,8 +566,13 @@ Status VFileScanner::_init_expr_ctxes() { _file_col_names.push_back(it->second->col_name()); } else { _partition_slot_descs.emplace_back(it->second); - auto iti = full_src_index_map.find(slot_id); - _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file); + if (_is_load) { + auto iti = full_src_index_map.find(slot_id); + _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file); + } else { + auto kit = partition_name_to_key_index_map.find(it->second->col_name()); + _partition_slot_index_map.emplace(slot_id, kit->second); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index 04916fb105..8de5b5d312 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -304,10 +304,11 @@ public class HiveScanProvider implements HMSTableScanProviderIf { for (InputSplit split : inputSplits) { FileSplit fileSplit = (FileSplit) split; + List<String> pathPartitionKeys = getPathPartitionKeys(); List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), - getPathPartitionKeys(), false); + pathPartitionKeys, false); - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath); + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); LOG.info( @@ -366,12 +367,14 @@ public class HiveScanProvider implements HMSTableScanProviderIf { return locations; } - private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath) + private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath, + List<String> columnsFromPathKeys) throws DdlException, MetaNotFoundException { TFileRangeDesc rangeDesc = new TFileRangeDesc(); rangeDesc.setStartOffset(fileSplit.getStart()); rangeDesc.setSize(fileSplit.getLength()); rangeDesc.setColumnsFromPath(columnsFromPath); + rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); if (getLocationType() == TFileType.FILE_HDFS) { rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 498e8d2082..6dd774ea4a 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -306,6 +306,8 @@ struct TFileRangeDesc { 5: optional i64 file_size; // columns parsed from file path should be after the columns read from file 6: optional list<string> columns_from_path; + // column names from file path, in the same order with columns_from_path + 7: optional list<string> columns_from_path_keys; } // TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org