This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 84ce2a1e98 [feature-wip](multi-catalog)(fix) partition value error when a block contains multiple splits (#11260) 84ce2a1e98 is described below commit 84ce2a1e983a118feae614166d1d1d905deeb4bb Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Fri Jul 29 18:48:59 2022 +0800 [feature-wip](multi-catalog)(fix) partition value error when a block contains multiple splits (#11260) `FileArrowScanner::get_next` returns a block when full, so it maybe contains multiple splits in small files or crosses two splits in large files. However, a block can only fill the partition values from one file. Different splits may be from different files, causing the error of embed partition values. --- be/src/vec/exec/file_arrow_scanner.cpp | 2 +- be/src/vec/exec/file_scanner.cpp | 5 +---- be/src/vec/exec/file_scanner.h | 2 +- be/src/vec/exec/file_text_scanner.cpp | 5 +++++ 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp index 30511b2392..e6c4fa7597 100644 --- a/be/src/vec/exec/file_arrow_scanner.cpp +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -194,7 +194,7 @@ Status FileArrowScanner::_append_batch_to_block(Block* block) { } _rows += num_elements; _arrow_batch_cur_idx += num_elements; - return Status::OK(); + return _fill_columns_from_path(block, num_elements); } void VFileParquetScanner::_update_profile(std::shared_ptr<Statistics>& statistics) { diff --git a/be/src/vec/exec/file_scanner.cpp b/be/src/vec/exec/file_scanner.cpp index a0f473ffc9..bb1ba21924 100644 --- a/be/src/vec/exec/file_scanner.cpp +++ b/be/src/vec/exec/file_scanner.cpp @@ -164,7 +164,6 @@ Status FileScanner::_filter_block(vectorized::Block* _block) { Status FileScanner::finalize_block(vectorized::Block* _block, bool* eof) { *eof = _scanner_eof; _read_row_counter += _block->rows(); - RETURN_IF_ERROR(_fill_columns_from_path(_block)); if (LIKELY(_rows > 0)) { RETURN_IF_ERROR(_filter_block(_block)); } @@ -172,11 +171,9 @@ Status FileScanner::finalize_block(vectorized::Block* _block, bool* eof) { return Status::OK(); } -Status FileScanner::_fill_columns_from_path(vectorized::Block* _block) { +Status FileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) { const TFileRangeDesc& range = _ranges.at(_next_range - 1); if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { - size_t rows = _rows; - for (const auto& slot_desc : _partition_slot_descs) { if (slot_desc == nullptr) continue; auto it = _partition_slot_index_map.find(slot_desc->id()); diff --git a/be/src/vec/exec/file_scanner.h b/be/src/vec/exec/file_scanner.h index 16e75aefc0..df4c1d4ef6 100644 --- a/be/src/vec/exec/file_scanner.h +++ b/be/src/vec/exec/file_scanner.h @@ -55,6 +55,7 @@ protected: virtual void _init_profiles(RuntimeProfile* profile) = 0; Status finalize_block(vectorized::Block* dest_block, bool* eof); + Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows); Status init_block(vectorized::Block* block); std::unique_ptr<TextConverter> _text_converter; @@ -106,7 +107,6 @@ protected: private: Status _init_expr_ctxes(); Status _filter_block(vectorized::Block* output_block); - Status _fill_columns_from_path(vectorized::Block* output_block); }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp index 593b78867f..02da0bca2c 100644 --- a/be/src/vec/exec/file_text_scanner.cpp +++ b/be/src/vec/exec/file_text_scanner.cpp @@ -91,6 +91,7 @@ Status FileTextScanner::get_next(Block* block, bool* eof) { const int batch_size = _state->batch_size(); + int current_rows = _rows; while (_rows < batch_size && !_scanner_eof) { if (_cur_line_reader == nullptr || _cur_line_reader_eof) { RETURN_IF_ERROR(_open_next_reader()); @@ -114,6 +115,10 @@ Status FileTextScanner::get_next(Block* block, bool* eof) { COUNTER_UPDATE(_rows_read_counter, 1); RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block)); } + if (_cur_line_reader_eof) { + RETURN_IF_ERROR(_fill_columns_from_path(block, _rows - current_rows)); + current_rows = _rows; + } } return finalize_block(block, eof); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org