This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7368eb1045f [fix](block-reader) Make rowsets union iterating work (#40877) (#43178) 7368eb1045f is described below commit 7368eb1045fb9daed41ebbe717ec99a6d602b23d Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Wed Nov 6 09:54:47 2024 +0800 [fix](block-reader) Make rowsets union iterating work (#40877) (#43178) pick: #40877 --- be/src/olap/compaction.cpp | 4 +-- be/src/olap/merger.cpp | 27 ++++--------------- be/src/olap/rowset/beta_rowset_reader.cpp | 6 +++++ be/src/olap/rowset/rowset.h | 8 ++++-- be/src/olap/rowset/rowset_reader_context.h | 3 +++ be/src/olap/tablet_reader.cpp | 1 + be/src/olap/tablet_reader.h | 2 ++ be/src/vec/olap/block_reader.cpp | 43 +++++++----------------------- be/src/vec/olap/block_reader.h | 3 ++- be/src/vec/olap/vcollect_iterator.cpp | 1 + be/test/olap/rowid_conversion_test.cpp | 7 ++++- 11 files changed, 44 insertions(+), 61 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 236e5d4ac7b..0fd9b57faf8 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -100,14 +100,14 @@ bool is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs) { } } std::string min_key; - auto ret = rhs->min_key(&min_key); + auto ret = rhs->first_key(&min_key); if (!ret) { return false; } if (min_key <= pre_max_key) { return false; } - CHECK(rhs->max_key(&pre_max_key)); + CHECK(rhs->last_key(&pre_max_key)); return true; } diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index ab034123ac8..a79434551b5 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -20,6 +20,7 @@ #include <gen_cpp/olap_file.pb.h> #include <gen_cpp/types.pb.h> #include <stddef.h> +#include <unistd.h> #include <algorithm> #include <iterator> @@ -91,6 +92,8 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, if (stats_output && stats_output->rowid_conversion) { reader_params.record_rowids = true; + reader_params.rowid_conversion = stats_output->rowid_conversion; + stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); } reader_params.return_columns.resize(cur_tablet_schema.num_columns()); @@ -98,17 +101,6 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type, reader_params.origin_return_columns = &reader_params.return_columns; RETURN_IF_ERROR(reader.init(reader_params)); - if (reader_params.record_rowids) { - stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); - // init segment rowid map for rowid conversion - std::vector<uint32_t> segment_num_rows; - for (auto& rs_split : reader_params.rs_splits) { - RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows)); - stats_output->rowid_conversion->init_segment_map( - rs_split.rs_reader->rowset()->rowset_id(), segment_num_rows); - } - } - vectorized::Block block = cur_tablet_schema.create_block(reader_params.return_columns); size_t output_rows = 0; bool eof = false; @@ -274,6 +266,8 @@ Status Merger::vertical_compact_one_group( if (is_key && stats_output && stats_output->rowid_conversion) { reader_params.record_rowids = true; + reader_params.rowid_conversion = stats_output->rowid_conversion; + stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); } reader_params.return_columns = column_group; @@ -281,17 +275,6 @@ Status Merger::vertical_compact_one_group( reader_params.batch_size = batch_size; RETURN_IF_ERROR(reader.init(reader_params, sample_info)); - if (reader_params.record_rowids) { - stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); - // init segment rowid map for rowid conversion - std::vector<uint32_t> segment_num_rows; - for (auto& rs_split : reader_params.rs_splits) { - RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows)); - stats_output->rowid_conversion->init_segment_map( - rs_split.rs_reader->rowset()->rowset_id(), segment_num_rows); - } - } - vectorized::Block block = tablet_schema.create_block(reader_params.return_columns); size_t output_rows = 0; bool eof = false; diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index d2c7023f659..042893f1374 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -235,6 +235,12 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context for (size_t i = 0; i < segments.size(); i++) { _segments_rows[i] = segments[i]->num_rows(); } + if (_read_context->record_rowids) { + // init segment rowid map for rowid conversion + std::vector<uint32_t> segment_num_rows; + RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows)); + _read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), segment_num_rows); + } auto [seg_start, seg_end] = _segment_offsets; if (seg_start == seg_end) { diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 6050a33bfc2..24e660cd2f7 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -269,7 +269,9 @@ public: _rowset_meta->get_segments_key_bounds(segments_key_bounds); return Status::OK(); } - bool min_key(std::string* min_key) { + + // min key of the first segment + bool first_key(std::string* min_key) { KeyBoundsPB key_bounds; bool ret = _rowset_meta->get_first_segment_key_bound(&key_bounds); if (!ret) { @@ -278,7 +280,9 @@ public: *min_key = key_bounds.min_key(); return true; } - bool max_key(std::string* max_key) { + + // max key of the last segment + bool last_key(std::string* max_key) { KeyBoundsPB key_bounds; bool ret = _rowset_meta->get_last_segment_key_bound(&key_bounds); if (!ret) { diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 7af1ce7c047..fd3b4fed56f 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -21,6 +21,7 @@ #include "io/io_common.h" #include "olap/column_predicate.h" #include "olap/olap_common.h" +#include "olap/rowid_conversion.h" #include "runtime/runtime_state.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" @@ -75,6 +76,8 @@ struct RowsetReaderContext { bool enable_unique_key_merge_on_write = false; const DeleteBitmap* delete_bitmap = nullptr; bool record_rowids = false; + RowIdConversion* rowid_conversion; + bool is_vertical_compaction = false; bool is_key_column_group = false; const std::set<int32_t>* output_columns = nullptr; RowsetId rowset_id; diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index 9ab9e4b1b36..7410b70f4aa 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -254,6 +254,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { _reader_context.delete_bitmap = read_params.delete_bitmap; _reader_context.enable_unique_key_merge_on_write = tablet()->enable_unique_key_merge_on_write(); _reader_context.record_rowids = read_params.record_rowids; + _reader_context.rowid_conversion = read_params.rowid_conversion; _reader_context.is_key_column_group = read_params.is_key_column_group; _reader_context.remaining_conjunct_roots = read_params.remaining_conjunct_roots; _reader_context.common_expr_ctxs_push_down = read_params.common_expr_ctxs_push_down; diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index 50517e047ba..87af3bb08eb 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -39,6 +39,7 @@ #include "olap/olap_common.h" #include "olap/olap_tuple.h" #include "olap/row_cursor.h" +#include "olap/rowid_conversion.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_reader.h" @@ -166,6 +167,7 @@ public: // used for compaction to record row ids bool record_rowids = false; + RowIdConversion* rowid_conversion; std::vector<int> topn_filter_source_node_ids; int topn_filter_target_node_id = -1; // used for special optimization for query : ORDER BY key LIMIT n diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 9d79b51975c..c46ff330f2b 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -72,54 +72,31 @@ Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) { return res; } -bool BlockReader::_rowsets_overlapping(const ReaderParams& read_params) { - std::string cur_max_key; +bool BlockReader::_rowsets_mono_asc_disjoint(const ReaderParams& read_params) { + std::string cur_rs_last_key; const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits; for (const auto& rs_split : rs_splits) { - // version 0-1 of every tablet is empty, just skip this rowset - if (rs_split.rs_reader->rowset()->version().second == 1) { - continue; - } if (rs_split.rs_reader->rowset()->num_rows() == 0) { continue; } if (rs_split.rs_reader->rowset()->is_segments_overlapping()) { return true; } - std::string min_key; - bool has_min_key = rs_split.rs_reader->rowset()->min_key(&min_key); - if (!has_min_key) { + std::string rs_first_key; + bool has_first_key = rs_split.rs_reader->rowset()->first_key(&rs_first_key); + if (!has_first_key) { return true; } - if (min_key <= cur_max_key) { + if (rs_first_key <= cur_rs_last_key) { return true; } - CHECK(rs_split.rs_reader->rowset()->max_key(&cur_max_key)); + bool has_last_key = rs_split.rs_reader->rowset()->last_key(&cur_rs_last_key); + CHECK(has_last_key); } - for (const auto& rs_reader : rs_splits) { - // version 0-1 of every tablet is empty, just skip this rowset - if (rs_reader.rs_reader->rowset()->version().second == 1) { - continue; - } - if (rs_reader.rs_reader->rowset()->num_rows() == 0) { - continue; - } - if (rs_reader.rs_reader->rowset()->is_segments_overlapping()) { - return true; - } - std::string min_key; - bool has_min_key = rs_reader.rs_reader->rowset()->min_key(&min_key); - if (!has_min_key) { - return true; - } - if (min_key <= cur_max_key) { - return true; - } - CHECK(rs_reader.rs_reader->rowset()->max_key(&cur_max_key)); - } return false; } + Status BlockReader::_init_collect_iter(const ReaderParams& read_params) { auto res = _capture_rs_readers(read_params); if (!res.ok()) { @@ -131,7 +108,7 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) { return res; } // check if rowsets are noneoverlapping - _is_rowsets_overlapping = _rowsets_overlapping(read_params); + _is_rowsets_overlapping = _rowsets_mono_asc_disjoint(read_params); _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key, read_params.read_orderby_key_reverse); diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index 6f9792929db..f33fe743109 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -86,7 +86,8 @@ private: bool _get_next_row_same(); - bool _rowsets_overlapping(const ReaderParams& read_params); + // return true if keys of rowsets are mono ascending and disjoint + bool _rowsets_mono_asc_disjoint(const ReaderParams& read_params); VCollectIterator _vcollect_iter; IteratorRowRef _next_row {{}, -1, false}; diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index f7017a058df..8c910656837 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -499,6 +499,7 @@ Status VCollectIterator::Level0Iterator::refresh_current_row() { if (_block == nullptr && !_get_data_by_ref) { _block = std::make_shared<Block>(_schema.create_block( _reader->_return_columns, _reader->_tablet_columns_convert_to_null_set)); + _ref.block = _block; } if (!_is_empty() && _current_valid()) { diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index 5ae80398afb..d48d4150ad3 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -447,7 +447,12 @@ protected: int64_t c1 = j * rows_per_segment + n; // There are 500 rows of data overlap between rowsets if (i > 0) { - c1 += i * num_segments * rows_per_segment - 500; + if (is_overlap) { + // There are 500 rows of data overlap between rowsets + c1 -= 500; + } else { + ++c1; + } } if (is_overlap && j > 0) { // There are 10 rows of data overlap between segments --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org