zhannngchen commented on code in PR #20153:
URL: https://github.com/apache/doris/pull/20153#discussion_r1209339927


##########
be/src/olap/primary_key_index.cpp:
##########
@@ -103,4 +104,123 @@ Status PrimaryKeyIndexReader::parse_bf(io::FileReaderSPtr 
file_reader,
     return Status::OK();
 }
 
+Status MergeIndexedColumnIteratorContext::init() {
+    RETURN_IF_ERROR(_segment->load_pk_index_and_bf());
+    auto pk_idx = _segment->get_primary_key_index();
+    _index = pk_idx;
+    return Status::OK();
+}
+
+std::pair<Status, Slice> MergeIndexedColumnIteratorContext::get_current_key() {
+    if (_cur_pos >= _cur_size) {
+        if (_cur_row_id >= _index->num_rows()) {
+            return {Status::EndOfFile("Reach the end of file"), {}};
+        }
+        auto st = _index->new_iterator(&_iter);
+        if (!st.ok()) {
+            return {st, {}};
+        }
+        Slice last_key_slice(_last_key);
+        st = _iter->seek_at_or_after(&last_key_slice, &_excat_match);
+        if (!st.ok()) {
+            return {st, {}};
+        }
+        auto current_ordinal = _iter->get_current_ordinal();
+        st = _next_batch(current_ordinal);
+        if (!st.ok()) {
+            return {st, {}};
+        }
+    }
+    return {Status::OK(), Slice(_index_column->get_data_at(_cur_pos).data,
+                                _index_column->get_data_at(_cur_pos).size)};
+}
+
+Status MergeIndexedColumnIteratorContext::advance() {
+    ++_cur_pos;
+    ++_cur_row_id;
+    if (_cur_row_id >= _index->num_rows()) {
+        return Status::EndOfFile(fmt::format("Reach the end of file", 
_segment_id));
+    }
+    return Status::OK();
+}
+
+Status MergeIndexedColumnIteratorContext::jump_to_ge(Slice const& key) {
+    RETURN_IF_ERROR(_index->new_iterator(&_iter));
+    auto st = _iter->seek_at_or_after(&key, &_excat_match);
+    if (st.is<ErrorCode::NOT_FOUND>()) {
+        return Status::EndOfFile("Reach the end of file");
+    }
+    RETURN_IF_ERROR(st);
+    auto current_ordinal = _iter->get_current_ordinal();
+    DCHECK(current_ordinal > _cur_row_id)
+            << fmt::format("current_ordinal: {} should be greater than 
_cur_row_id: {}",
+                           current_ordinal, _cur_row_id);
+    if (current_ordinal + _cur_pos < _cur_size + _cur_row_id) {
+        _cur_pos = _cur_pos + current_ordinal - _cur_row_id;
+        _cur_row_id = current_ordinal;
+        return Status::OK();
+    }
+    return _next_batch(current_ordinal);
+}
+
+Status MergeIndexedColumnIteratorContext::_next_batch(size_t row_id) {
+    auto total = _index->num_rows();
+    if (row_id >= total) {
+        return Status::EndOfFile("Reach the end of file");
+    }
+    auto& pk_idx = _index;
+    _index_type = vectorized::DataTypeFactory::instance().create_data_type(
+            pk_idx->type_info()->type(), 1, 0);
+    _index_column = _index_type->create_column();
+    auto remaining = total - row_id;
+    size_t num_to_read = std::min(_batch_size, remaining);
+    size_t num_read = num_to_read;
+    RETURN_IF_ERROR(_iter->next_batch(&num_read, _index_column));
+    DCHECK(num_to_read == num_read) << "num_to_read: " << num_to_read << ", 
num_read: " << num_read;
+    _last_key = _index_column->get_data_at(num_read - 1).to_string();
+    if (num_read == _batch_size && num_read != remaining) {
+        num_read -= 1;
+    }
+    _cur_size = num_read;
+    _cur_pos = 0;
+    _cur_row_id = row_id;
+    return Status::OK();
+}
+
+bool MergeIndexedColumnIteratorContext::Comparator::operator()(
+        MergeIndexedColumnIteratorContext* node1, 
MergeIndexedColumnIteratorContext* node2) const {
+    auto&& [st1, key1] = node1->get_current_key();
+    RETURN_IF_ERROR(st1);
+    auto&& [st2, key2] = node2->get_current_key();
+    RETURN_IF_ERROR(st2);
+    if (_seq_col_length == 0) {
+        auto cmp_result = key1.compare(key2);
+        return cmp_result ? (cmp_result > 0) : (node1->segment_id() < 
node2->segment_id());
+    }
+    auto key1_without_seq = Slice(key1.get_data(), key1.get_size() - 
_seq_col_length);
+    auto key2_without_seq = Slice(key2.get_data(), key2.get_size() - 
_seq_col_length);
+    auto cmp_result = key1_without_seq.compare(key2_without_seq);
+    if (cmp_result != 0) {
+        return cmp_result > 0;
+    }
+    auto key1_sequence_val =
+            Slice(key1.get_data() + key1.get_size() - _seq_col_length, 
_seq_col_length);
+    auto key2_sequence_val =
+            Slice(key2.get_data() + key2.get_size() - _seq_col_length, 
_seq_col_length);
+    cmp_result = key1_sequence_val.compare(key2_sequence_val);
+    if (cmp_result != 0) {
+        return cmp_result > 0;
+    }
+    return node1->segment_id() < node2->segment_id();
+}
+
+bool MergeIndexedColumnIteratorContext::Comparator::is_key_same(
+        MergeIndexedColumnIteratorContext* node1, Slice const& key2) const {

Review Comment:
   key2 -> rhs



##########
be/src/olap/primary_key_index.cpp:
##########
@@ -103,4 +104,123 @@ Status PrimaryKeyIndexReader::parse_bf(io::FileReaderSPtr 
file_reader,
     return Status::OK();
 }
 
+Status MergeIndexedColumnIteratorContext::init() {
+    RETURN_IF_ERROR(_segment->load_pk_index_and_bf());
+    auto pk_idx = _segment->get_primary_key_index();
+    _index = pk_idx;

Review Comment:
   why not `_index = _segment->get_primary_key_index();` ?



##########
be/src/olap/primary_key_index.cpp:
##########
@@ -103,4 +104,123 @@ Status PrimaryKeyIndexReader::parse_bf(io::FileReaderSPtr 
file_reader,
     return Status::OK();
 }
 
+Status MergeIndexedColumnIteratorContext::init() {
+    RETURN_IF_ERROR(_segment->load_pk_index_and_bf());

Review Comment:
   use `load_index()`, you don't need to load bf



##########
be/src/olap/tablet.cpp:
##########
@@ -3264,4 +3240,83 @@ bool Tablet::is_enable_binlog() {
 void Tablet::set_binlog_config(BinlogConfig binlog_config) {
     tablet_meta()->set_binlog_config(std::move(binlog_config));
 }
+
+// caller should hold meta_lock
+Status Tablet::calc_delete_bitmap_between_segments(
+        RowsetSharedPtr rowset, const 
std::vector<segment_v2::SegmentSharedPtr>& segments,
+        DeleteBitmapPtr delete_bitmap) {
+    size_t const num_segments = segments.size();
+
+    if (num_segments < 2) {
+        return Status::OK();
+    }
+
+    OlapStopWatch watch;
+
+    auto const rowset_id = rowset->rowset_id();
+    std::vector<MergeIndexedColumnIteratorContext> nodes;
+    nodes.reserve(num_segments);
+    for (auto& seg : segments) {
+        nodes.emplace_back(seg);
+        RETURN_IF_ERROR(nodes.back().init());
+    }
+
+    size_t seq_col_length = 0;
+    if (_schema->has_sequence_col()) {
+        auto seq_col_idx = _schema->sequence_col_idx();
+        seq_col_length = _schema->column(seq_col_idx).length();
+    }
+
+    MergeIndexedColumnIteratorContext::Comparator cmp(seq_col_length);
+    std::priority_queue<MergeIndexedColumnIteratorContext*,
+                        std::vector<MergeIndexedColumnIteratorContext*>,
+                        MergeIndexedColumnIteratorContext::Comparator>
+            heap(cmp);
+    for (auto& node : nodes) {
+        heap.push(&node);
+    }
+
+    while (heap.size() > 1) {

Review Comment:
   You'd better create a new class for such function, which is more friendly 
for maintenance
   such as MergeIndexDeleteBitmapCalculator



##########
be/src/olap/tablet.cpp:
##########
@@ -3264,4 +3240,83 @@ bool Tablet::is_enable_binlog() {
 void Tablet::set_binlog_config(BinlogConfig binlog_config) {
     tablet_meta()->set_binlog_config(std::move(binlog_config));
 }
+
+// caller should hold meta_lock
+Status Tablet::calc_delete_bitmap_between_segments(
+        RowsetSharedPtr rowset, const 
std::vector<segment_v2::SegmentSharedPtr>& segments,
+        DeleteBitmapPtr delete_bitmap) {
+    size_t const num_segments = segments.size();
+
+    if (num_segments < 2) {
+        return Status::OK();
+    }
+
+    OlapStopWatch watch;
+
+    auto const rowset_id = rowset->rowset_id();
+    std::vector<MergeIndexedColumnIteratorContext> nodes;
+    nodes.reserve(num_segments);
+    for (auto& seg : segments) {
+        nodes.emplace_back(seg);
+        RETURN_IF_ERROR(nodes.back().init());
+    }
+
+    size_t seq_col_length = 0;
+    if (_schema->has_sequence_col()) {
+        auto seq_col_idx = _schema->sequence_col_idx();
+        seq_col_length = _schema->column(seq_col_idx).length();
+    }
+
+    MergeIndexedColumnIteratorContext::Comparator cmp(seq_col_length);
+    std::priority_queue<MergeIndexedColumnIteratorContext*,
+                        std::vector<MergeIndexedColumnIteratorContext*>,
+                        MergeIndexedColumnIteratorContext::Comparator>
+            heap(cmp);
+    for (auto& node : nodes) {
+        heap.push(&node);
+    }
+
+    while (heap.size() > 1) {

Review Comment:
   You also should add UT for such class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to