This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0-var in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0-var by this push: new f8e3264d032 [Pick-var](inverted index) pick read && seek optimization to variant branch #26689 (#27600) f8e3264d032 is described below commit f8e3264d032c6cde0b40116b169e98e4480145d0 Author: airborne12 <airborn...@gmail.com> AuthorDate: Mon Nov 27 17:54:12 2023 +0800 [Pick-var](inverted index) pick read && seek optimization to variant branch #26689 (#27600) --- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 193 ++++++++++++++------- be/src/olap/rowset/segment_v2/segment_iterator.h | 3 +- 2 files changed, 133 insertions(+), 63 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 65ca6f7e61d..a36536745d7 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -104,11 +104,12 @@ public: explicit BitmapRangeIterator(const roaring::Roaring& bitmap) { roaring_init_iterator(&bitmap.roaring, &_iter); - _read_next_batch(); } bool has_more_range() const { return !_eof; } + [[nodiscard]] static uint32_t get_batch_size() { return kBatchSize; } + // read next range into [*from, *to) whose size <= max_range_size. // return false when there is no more range. virtual bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) { @@ -147,6 +148,11 @@ public: return true; } + // read batch_size of rowids from roaring bitmap into buf array + virtual uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) { + return roaring::api::roaring_read_uint32_iterator(&_iter, buf, batch_size); + } + private: void _read_next_batch() { _buf_pos = 0; @@ -171,6 +177,8 @@ class SegmentIterator::BackwardBitmapRangeIterator : public SegmentIterator::Bit public: explicit BackwardBitmapRangeIterator(const roaring::Roaring& bitmap) { roaring_init_iterator_last(&bitmap.roaring, &_riter); + _rowid_count = roaring_bitmap_get_cardinality(&bitmap.roaring); + _rowid_left = _rowid_count; } bool has_more_range() const { return !_riter.has_value; } @@ -194,9 +202,51 @@ public: return true; } + /** + * Reads a batch of row IDs from a roaring bitmap, starting from the end and moving backwards. + * This function retrieves the last `batch_size` row IDs from the bitmap and stores them in the provided buffer. + * It updates the internal state to track how many row IDs are left to read in subsequent calls. + * + * The row IDs are read in reverse order, but stored in the buffer maintaining their original order in the bitmap. + * + * Example: + * input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19] + * If the bitmap has 12 elements and batch_size is set to 5, the function will first read [15, 16, 17, 18, 19] + * into the buffer, leaving 7 elements left. In the next call with batch_size 5, it will read [4, 5, 6, 7, 10]. + * + */ + uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) override { + if (!_riter.has_value || _rowid_left == 0) { + return 0; + } + + if (_rowid_count <= batch_size) { + roaring_bitmap_to_uint32_array(_riter.parent, + buf); // Fill 'buf' with '_rowid_count' elements. + uint32_t num_read = _rowid_left; // Save the number of row IDs read. + _rowid_left = 0; // No row IDs left after this operation. + return num_read; // Return the number of row IDs read. + } + + uint32_t read_size = std::min(batch_size, _rowid_left); + uint32_t num_read = 0; // Counter for the number of row IDs read. + + // Read row IDs into the buffer in reverse order. + while (num_read < read_size && _riter.has_value) { + buf[read_size - num_read - 1] = _riter.current_value; + num_read++; + _rowid_left--; // Decrement the count of remaining row IDs. + roaring_previous_uint32_iterator(&_riter); + } + + // Return the actual number of row IDs read. + return num_read; + } private: roaring::api::roaring_uint32_iterator_t _riter; + uint32_t _rowid_count; + uint32_t _rowid_left; }; SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema) @@ -1694,56 +1744,86 @@ void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) { } } +/** + * Reads columns by their index, handling both continuous and discontinuous rowid scenarios. + * + * This function is designed to read a specified number of rows (up to nrows_read_limit) + * from the segment iterator, dealing with both continuous and discontinuous rowid arrays. + * It operates as follows: + * + * 1. Reads a batch of rowids (up to the specified limit), and checks if they are continuous. + * Continuous here means that the rowids form an unbroken sequence (e.g., 1, 2, 3, 4...). + * + * 2. For each column that needs to be read (identified by _first_read_column_ids): + * - If the rowids are continuous, the function uses seek_to_ordinal and next_batch + * for efficient reading. + * - If the rowids are not continuous, the function processes them in smaller batches + * (each of size up to 256). Each batch is checked for internal continuity: + * a. If a batch is continuous, uses seek_to_ordinal and next_batch for that batch. + * b. If a batch is not continuous, uses read_by_rowids for individual rowids in the batch. + * + * This approach optimizes reading performance by leveraging batch processing for continuous + * rowid sequences and handling discontinuities gracefully in smaller chunks. + */ Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint32_t& nrows_read, bool set_block_rowid) { SCOPED_RAW_TIMER(&_opts.stats->first_read_ns); - do { - uint32_t range_from = 0; - uint32_t range_to = 0; - bool has_next_range = - _range_iter->next_range(nrows_read_limit - nrows_read, &range_from, &range_to); - if (!has_next_range) { - break; - } - - size_t rows_to_read = range_to - range_from; - _cur_rowid = range_to; - - if (set_block_rowid) { - // Here use std::iota is better performance than for-loop, maybe for-loop is not vectorized - auto start = _block_rowids.data() + nrows_read; - auto end = start + rows_to_read; - std::iota(start, end, range_from); - nrows_read += rows_to_read; - } else { - nrows_read += rows_to_read; - } - - _split_row_ranges.emplace_back(std::pair {range_from, range_to}); - } while (nrows_read < nrows_read_limit && !_opts.read_orderby_key_reverse); + nrows_read = _range_iter->read_batch_rowids(_block_rowids.data(), nrows_read_limit); + bool is_continuous = (nrows_read > 1) && + (_block_rowids[nrows_read - 1] - _block_rowids[0] == nrows_read - 1); for (auto cid : _first_read_column_ids) { auto& column = _current_return_columns[cid]; if (_prune_column(cid, column, true, nrows_read)) { continue; } - for (auto& range : _split_row_ranges) { - size_t nrows = range.second - range.first; - { - _opts.stats->block_first_read_seek_num += 1; - if (_opts.runtime_state && _opts.runtime_state->enable_profile()) { - SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns); - RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(range.first)); - } else { - RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(range.first)); - } + + if (is_continuous) { + size_t rows_read = nrows_read; + _opts.stats->block_first_read_seek_num += 1; + if (_opts.runtime_state && _opts.runtime_state->enable_profile()) { + SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns); + RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0])); + } else { + RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0])); } - size_t rows_read = nrows; RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column)); - if (rows_read != nrows) { - return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})", nrows, - rows_read); + if (rows_read != nrows_read) { + return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})", + nrows_read, rows_read); + } + } else { + const uint32_t batch_size = _range_iter->get_batch_size(); + uint32_t processed = 0; + while (processed < nrows_read) { + uint32_t current_batch_size = std::min(batch_size, nrows_read - processed); + bool batch_continuous = (current_batch_size > 1) && + (_block_rowids[processed + current_batch_size - 1] - + _block_rowids[processed] == + current_batch_size - 1); + + if (batch_continuous) { + size_t rows_read = current_batch_size; + _opts.stats->block_first_read_seek_num += 1; + if (_opts.runtime_state && _opts.runtime_state->enable_profile()) { + SCOPED_RAW_TIMER(&_opts.stats->block_first_read_seek_ns); + RETURN_IF_ERROR( + _column_iterators[cid]->seek_to_ordinal(_block_rowids[processed])); + } else { + RETURN_IF_ERROR( + _column_iterators[cid]->seek_to_ordinal(_block_rowids[processed])); + } + RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column)); + if (rows_read != current_batch_size) { + return Status::Error<ErrorCode::INTERNAL_ERROR>( + "batch nrows({}) != rows_read({})", current_batch_size, rows_read); + } + } else { + RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids( + &_block_rowids[processed], current_batch_size, column)); + } + processed += current_batch_size; } } } @@ -1984,8 +2064,6 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { nrows_read_limit = std::min(nrows_read_limit, (uint32_t)100); _wait_times_estimate_row_size--; } - _split_row_ranges.clear(); - _split_row_ranges.reserve(nrows_read_limit / 2); RETURN_IF_ERROR(_read_columns_by_index( nrows_read_limit, _current_batch_rows_read, _lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval)); @@ -2279,35 +2357,28 @@ void SegmentIterator::_output_index_result_column(uint16_t* sel_rowid_idx, uint1 } } -void SegmentIterator::_build_index_result_column(uint16_t* sel_rowid_idx, uint16_t select_size, - vectorized::Block* block, +void SegmentIterator::_build_index_result_column(const uint16_t* sel_rowid_idx, + uint16_t select_size, vectorized::Block* block, const std::string& pred_result_sign, const roaring::Roaring& index_result) { auto index_result_column = vectorized::ColumnUInt8::create(); vectorized::ColumnUInt8::Container& vec_match_pred = index_result_column->get_data(); vec_match_pred.resize(block->rows()); - size_t idx_in_block = 0; - size_t idx_in_row_range = 0; size_t idx_in_selected = 0; - // _split_row_ranges store multiple ranges which split in function _read_columns_by_index(), - // index_result is a column predicate apply result in a whole segement, - // but a scanner thread one time can read max rows limit by block_row_max, - // so split _row_bitmap by one time scan range, in order to match size of one scanner thread read rows. - for (auto origin_row_range : _split_row_ranges) { - for (size_t rowid = origin_row_range.first; rowid < origin_row_range.second; ++rowid) { - if (sel_rowid_idx == nullptr || (idx_in_selected < select_size && - idx_in_row_range == sel_rowid_idx[idx_in_selected])) { - if (index_result.contains(rowid)) { - vec_match_pred[idx_in_block++] = true; - } else { - vec_match_pred[idx_in_block++] = false; - } - idx_in_selected++; + + for (uint32_t i = 0; i < _current_batch_rows_read; i++) { + auto rowid = _block_rowids[i]; + if (sel_rowid_idx == nullptr || + (idx_in_selected < select_size && i == sel_rowid_idx[idx_in_selected])) { + if (index_result.contains(rowid)) { + vec_match_pred[idx_in_selected] = true; + } else { + vec_match_pred[idx_in_selected] = false; } - idx_in_row_range++; + idx_in_selected++; } } - assert(block->rows() == vec_match_pred.size()); + DCHECK(block->rows() == vec_match_pred.size()); auto index_result_position = block->get_position_by_name(pred_result_sign); block->replace_by_position(index_result_position, std::move(index_result_column)); } diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 579d250dfec..739b172ad0d 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -260,7 +260,7 @@ private: std::string _gen_predicate_result_sign(ColumnPredicate* predicate); std::string _gen_predicate_result_sign(ColumnPredicateInfo* predicate_info); - void _build_index_result_column(uint16_t* sel_rowid_idx, uint16_t select_size, + void _build_index_result_column(const uint16_t* sel_rowid_idx, uint16_t select_size, vectorized::Block* block, const std::string& pred_result_sign, const roaring::Roaring& index_result); void _output_index_result_column(uint16_t* sel_rowid_idx, uint16_t select_size, @@ -343,7 +343,6 @@ private: roaring::Roaring _row_bitmap; // "column_name+operator+value-> <in_compound_query, rowid_result> std::unordered_map<std::string, std::pair<bool, roaring::Roaring>> _rowid_result_for_index; - std::vector<std::pair<uint32_t, uint32_t>> _split_row_ranges; // an iterator for `_row_bitmap` that can be used to extract row range to scan std::unique_ptr<BitmapRangeIterator> _range_iter; // the next rowid to read --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org