zhannngchen commented on code in PR #20153: URL: https://github.com/apache/doris/pull/20153#discussion_r1209339927
########## be/src/olap/primary_key_index.cpp: ########## @@ -103,4 +104,123 @@ Status PrimaryKeyIndexReader::parse_bf(io::FileReaderSPtr file_reader, return Status::OK(); } +Status MergeIndexedColumnIteratorContext::init() { + RETURN_IF_ERROR(_segment->load_pk_index_and_bf()); + auto pk_idx = _segment->get_primary_key_index(); + _index = pk_idx; + return Status::OK(); +} + +std::pair<Status, Slice> MergeIndexedColumnIteratorContext::get_current_key() { + if (_cur_pos >= _cur_size) { + if (_cur_row_id >= _index->num_rows()) { + return {Status::EndOfFile("Reach the end of file"), {}}; + } + auto st = _index->new_iterator(&_iter); + if (!st.ok()) { + return {st, {}}; + } + Slice last_key_slice(_last_key); + st = _iter->seek_at_or_after(&last_key_slice, &_excat_match); + if (!st.ok()) { + return {st, {}}; + } + auto current_ordinal = _iter->get_current_ordinal(); + st = _next_batch(current_ordinal); + if (!st.ok()) { + return {st, {}}; + } + } + return {Status::OK(), Slice(_index_column->get_data_at(_cur_pos).data, + _index_column->get_data_at(_cur_pos).size)}; +} + +Status MergeIndexedColumnIteratorContext::advance() { + ++_cur_pos; + ++_cur_row_id; + if (_cur_row_id >= _index->num_rows()) { + return Status::EndOfFile(fmt::format("Reach the end of file", _segment_id)); + } + return Status::OK(); +} + +Status MergeIndexedColumnIteratorContext::jump_to_ge(Slice const& key) { + RETURN_IF_ERROR(_index->new_iterator(&_iter)); + auto st = _iter->seek_at_or_after(&key, &_excat_match); + if (st.is<ErrorCode::NOT_FOUND>()) { + return Status::EndOfFile("Reach the end of file"); + } + RETURN_IF_ERROR(st); + auto current_ordinal = _iter->get_current_ordinal(); + DCHECK(current_ordinal > _cur_row_id) + << fmt::format("current_ordinal: {} should be greater than _cur_row_id: {}", + current_ordinal, _cur_row_id); + if (current_ordinal + _cur_pos < _cur_size + _cur_row_id) { + _cur_pos = _cur_pos + current_ordinal - _cur_row_id; + _cur_row_id = current_ordinal; + return Status::OK(); + } + return _next_batch(current_ordinal); +} + +Status MergeIndexedColumnIteratorContext::_next_batch(size_t row_id) { + auto total = _index->num_rows(); + if (row_id >= total) { + return Status::EndOfFile("Reach the end of file"); + } + auto& pk_idx = _index; + _index_type = vectorized::DataTypeFactory::instance().create_data_type( + pk_idx->type_info()->type(), 1, 0); + _index_column = _index_type->create_column(); + auto remaining = total - row_id; + size_t num_to_read = std::min(_batch_size, remaining); + size_t num_read = num_to_read; + RETURN_IF_ERROR(_iter->next_batch(&num_read, _index_column)); + DCHECK(num_to_read == num_read) << "num_to_read: " << num_to_read << ", num_read: " << num_read; + _last_key = _index_column->get_data_at(num_read - 1).to_string(); + if (num_read == _batch_size && num_read != remaining) { + num_read -= 1; + } + _cur_size = num_read; + _cur_pos = 0; + _cur_row_id = row_id; + return Status::OK(); +} + +bool MergeIndexedColumnIteratorContext::Comparator::operator()( + MergeIndexedColumnIteratorContext* node1, MergeIndexedColumnIteratorContext* node2) const { + auto&& [st1, key1] = node1->get_current_key(); + RETURN_IF_ERROR(st1); + auto&& [st2, key2] = node2->get_current_key(); + RETURN_IF_ERROR(st2); + if (_seq_col_length == 0) { + auto cmp_result = key1.compare(key2); + return cmp_result ? (cmp_result > 0) : (node1->segment_id() < node2->segment_id()); + } + auto key1_without_seq = Slice(key1.get_data(), key1.get_size() - _seq_col_length); + auto key2_without_seq = Slice(key2.get_data(), key2.get_size() - _seq_col_length); + auto cmp_result = key1_without_seq.compare(key2_without_seq); + if (cmp_result != 0) { + return cmp_result > 0; + } + auto key1_sequence_val = + Slice(key1.get_data() + key1.get_size() - _seq_col_length, _seq_col_length); + auto key2_sequence_val = + Slice(key2.get_data() + key2.get_size() - _seq_col_length, _seq_col_length); + cmp_result = key1_sequence_val.compare(key2_sequence_val); + if (cmp_result != 0) { + return cmp_result > 0; + } + return node1->segment_id() < node2->segment_id(); +} + +bool MergeIndexedColumnIteratorContext::Comparator::is_key_same( + MergeIndexedColumnIteratorContext* node1, Slice const& key2) const { Review Comment: key2 -> rhs ########## be/src/olap/primary_key_index.cpp: ########## @@ -103,4 +104,123 @@ Status PrimaryKeyIndexReader::parse_bf(io::FileReaderSPtr file_reader, return Status::OK(); } +Status MergeIndexedColumnIteratorContext::init() { + RETURN_IF_ERROR(_segment->load_pk_index_and_bf()); + auto pk_idx = _segment->get_primary_key_index(); + _index = pk_idx; Review Comment: why not `_index = _segment->get_primary_key_index();` ? ########## be/src/olap/primary_key_index.cpp: ########## @@ -103,4 +104,123 @@ Status PrimaryKeyIndexReader::parse_bf(io::FileReaderSPtr file_reader, return Status::OK(); } +Status MergeIndexedColumnIteratorContext::init() { + RETURN_IF_ERROR(_segment->load_pk_index_and_bf()); Review Comment: use `load_index()`, you don't need to load bf ########## be/src/olap/tablet.cpp: ########## @@ -3264,4 +3240,83 @@ bool Tablet::is_enable_binlog() { void Tablet::set_binlog_config(BinlogConfig binlog_config) { tablet_meta()->set_binlog_config(std::move(binlog_config)); } + +// caller should hold meta_lock +Status Tablet::calc_delete_bitmap_between_segments( + RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments, + DeleteBitmapPtr delete_bitmap) { + size_t const num_segments = segments.size(); + + if (num_segments < 2) { + return Status::OK(); + } + + OlapStopWatch watch; + + auto const rowset_id = rowset->rowset_id(); + std::vector<MergeIndexedColumnIteratorContext> nodes; + nodes.reserve(num_segments); + for (auto& seg : segments) { + nodes.emplace_back(seg); + RETURN_IF_ERROR(nodes.back().init()); + } + + size_t seq_col_length = 0; + if (_schema->has_sequence_col()) { + auto seq_col_idx = _schema->sequence_col_idx(); + seq_col_length = _schema->column(seq_col_idx).length(); + } + + MergeIndexedColumnIteratorContext::Comparator cmp(seq_col_length); + std::priority_queue<MergeIndexedColumnIteratorContext*, + std::vector<MergeIndexedColumnIteratorContext*>, + MergeIndexedColumnIteratorContext::Comparator> + heap(cmp); + for (auto& node : nodes) { + heap.push(&node); + } + + while (heap.size() > 1) { Review Comment: You'd better create a new class for such function, which is more friendly for maintenance such as MergeIndexDeleteBitmapCalculator ########## be/src/olap/tablet.cpp: ########## @@ -3264,4 +3240,83 @@ bool Tablet::is_enable_binlog() { void Tablet::set_binlog_config(BinlogConfig binlog_config) { tablet_meta()->set_binlog_config(std::move(binlog_config)); } + +// caller should hold meta_lock +Status Tablet::calc_delete_bitmap_between_segments( + RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments, + DeleteBitmapPtr delete_bitmap) { + size_t const num_segments = segments.size(); + + if (num_segments < 2) { + return Status::OK(); + } + + OlapStopWatch watch; + + auto const rowset_id = rowset->rowset_id(); + std::vector<MergeIndexedColumnIteratorContext> nodes; + nodes.reserve(num_segments); + for (auto& seg : segments) { + nodes.emplace_back(seg); + RETURN_IF_ERROR(nodes.back().init()); + } + + size_t seq_col_length = 0; + if (_schema->has_sequence_col()) { + auto seq_col_idx = _schema->sequence_col_idx(); + seq_col_length = _schema->column(seq_col_idx).length(); + } + + MergeIndexedColumnIteratorContext::Comparator cmp(seq_col_length); + std::priority_queue<MergeIndexedColumnIteratorContext*, + std::vector<MergeIndexedColumnIteratorContext*>, + MergeIndexedColumnIteratorContext::Comparator> + heap(cmp); + for (auto& node : nodes) { + heap.push(&node); + } + + while (heap.size() > 1) { Review Comment: You also should add UT for such class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org