This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7368eb1045f [fix](block-reader) Make rowsets union iterating work 
(#40877) (#43178)
7368eb1045f is described below

commit 7368eb1045fb9daed41ebbe717ec99a6d602b23d
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Wed Nov 6 09:54:47 2024 +0800

    [fix](block-reader) Make rowsets union iterating work (#40877) (#43178)
    
    pick: #40877
---
 be/src/olap/compaction.cpp                 |  4 +--
 be/src/olap/merger.cpp                     | 27 ++++---------------
 be/src/olap/rowset/beta_rowset_reader.cpp  |  6 +++++
 be/src/olap/rowset/rowset.h                |  8 ++++--
 be/src/olap/rowset/rowset_reader_context.h |  3 +++
 be/src/olap/tablet_reader.cpp              |  1 +
 be/src/olap/tablet_reader.h                |  2 ++
 be/src/vec/olap/block_reader.cpp           | 43 +++++++-----------------------
 be/src/vec/olap/block_reader.h             |  3 ++-
 be/src/vec/olap/vcollect_iterator.cpp      |  1 +
 be/test/olap/rowid_conversion_test.cpp     |  7 ++++-
 11 files changed, 44 insertions(+), 61 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 236e5d4ac7b..0fd9b57faf8 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -100,14 +100,14 @@ bool is_rowset_tidy(std::string& pre_max_key, const 
RowsetSharedPtr& rhs) {
         }
     }
     std::string min_key;
-    auto ret = rhs->min_key(&min_key);
+    auto ret = rhs->first_key(&min_key);
     if (!ret) {
         return false;
     }
     if (min_key <= pre_max_key) {
         return false;
     }
-    CHECK(rhs->max_key(&pre_max_key));
+    CHECK(rhs->last_key(&pre_max_key));
 
     return true;
 }
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index ab034123ac8..a79434551b5 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -20,6 +20,7 @@
 #include <gen_cpp/olap_file.pb.h>
 #include <gen_cpp/types.pb.h>
 #include <stddef.h>
+#include <unistd.h>
 
 #include <algorithm>
 #include <iterator>
@@ -91,6 +92,8 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, 
ReaderType reader_type,
 
     if (stats_output && stats_output->rowid_conversion) {
         reader_params.record_rowids = true;
+        reader_params.rowid_conversion = stats_output->rowid_conversion;
+        
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
     }
 
     reader_params.return_columns.resize(cur_tablet_schema.num_columns());
@@ -98,17 +101,6 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, 
ReaderType reader_type,
     reader_params.origin_return_columns = &reader_params.return_columns;
     RETURN_IF_ERROR(reader.init(reader_params));
 
-    if (reader_params.record_rowids) {
-        
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
-        // init segment rowid map for rowid conversion
-        std::vector<uint32_t> segment_num_rows;
-        for (auto& rs_split : reader_params.rs_splits) {
-            
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows));
-            stats_output->rowid_conversion->init_segment_map(
-                    rs_split.rs_reader->rowset()->rowset_id(), 
segment_num_rows);
-        }
-    }
-
     vectorized::Block block = 
cur_tablet_schema.create_block(reader_params.return_columns);
     size_t output_rows = 0;
     bool eof = false;
@@ -274,6 +266,8 @@ Status Merger::vertical_compact_one_group(
 
     if (is_key && stats_output && stats_output->rowid_conversion) {
         reader_params.record_rowids = true;
+        reader_params.rowid_conversion = stats_output->rowid_conversion;
+        
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
     }
 
     reader_params.return_columns = column_group;
@@ -281,17 +275,6 @@ Status Merger::vertical_compact_one_group(
     reader_params.batch_size = batch_size;
     RETURN_IF_ERROR(reader.init(reader_params, sample_info));
 
-    if (reader_params.record_rowids) {
-        
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
-        // init segment rowid map for rowid conversion
-        std::vector<uint32_t> segment_num_rows;
-        for (auto& rs_split : reader_params.rs_splits) {
-            
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows));
-            stats_output->rowid_conversion->init_segment_map(
-                    rs_split.rs_reader->rowset()->rowset_id(), 
segment_num_rows);
-        }
-    }
-
     vectorized::Block block = 
tablet_schema.create_block(reader_params.return_columns);
     size_t output_rows = 0;
     bool eof = false;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index d2c7023f659..042893f1374 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -235,6 +235,12 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
     for (size_t i = 0; i < segments.size(); i++) {
         _segments_rows[i] = segments[i]->num_rows();
     }
+    if (_read_context->record_rowids) {
+        // init segment rowid map for rowid conversion
+        std::vector<uint32_t> segment_num_rows;
+        RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows));
+        
_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), 
segment_num_rows);
+    }
 
     auto [seg_start, seg_end] = _segment_offsets;
     if (seg_start == seg_end) {
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 6050a33bfc2..24e660cd2f7 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -269,7 +269,9 @@ public:
         _rowset_meta->get_segments_key_bounds(segments_key_bounds);
         return Status::OK();
     }
-    bool min_key(std::string* min_key) {
+
+    // min key of the first segment
+    bool first_key(std::string* min_key) {
         KeyBoundsPB key_bounds;
         bool ret = _rowset_meta->get_first_segment_key_bound(&key_bounds);
         if (!ret) {
@@ -278,7 +280,9 @@ public:
         *min_key = key_bounds.min_key();
         return true;
     }
-    bool max_key(std::string* max_key) {
+
+    // max key of the last segment
+    bool last_key(std::string* max_key) {
         KeyBoundsPB key_bounds;
         bool ret = _rowset_meta->get_last_segment_key_bound(&key_bounds);
         if (!ret) {
diff --git a/be/src/olap/rowset/rowset_reader_context.h 
b/be/src/olap/rowset/rowset_reader_context.h
index 7af1ce7c047..fd3b4fed56f 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -21,6 +21,7 @@
 #include "io/io_common.h"
 #include "olap/column_predicate.h"
 #include "olap/olap_common.h"
+#include "olap/rowid_conversion.h"
 #include "runtime/runtime_state.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
@@ -75,6 +76,8 @@ struct RowsetReaderContext {
     bool enable_unique_key_merge_on_write = false;
     const DeleteBitmap* delete_bitmap = nullptr;
     bool record_rowids = false;
+    RowIdConversion* rowid_conversion;
+    bool is_vertical_compaction = false;
     bool is_key_column_group = false;
     const std::set<int32_t>* output_columns = nullptr;
     RowsetId rowset_id;
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index 9ab9e4b1b36..7410b70f4aa 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -254,6 +254,7 @@ Status TabletReader::_capture_rs_readers(const 
ReaderParams& read_params) {
     _reader_context.delete_bitmap = read_params.delete_bitmap;
     _reader_context.enable_unique_key_merge_on_write = 
tablet()->enable_unique_key_merge_on_write();
     _reader_context.record_rowids = read_params.record_rowids;
+    _reader_context.rowid_conversion = read_params.rowid_conversion;
     _reader_context.is_key_column_group = read_params.is_key_column_group;
     _reader_context.remaining_conjunct_roots = 
read_params.remaining_conjunct_roots;
     _reader_context.common_expr_ctxs_push_down = 
read_params.common_expr_ctxs_push_down;
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index 50517e047ba..87af3bb08eb 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -39,6 +39,7 @@
 #include "olap/olap_common.h"
 #include "olap/olap_tuple.h"
 #include "olap/row_cursor.h"
+#include "olap/rowid_conversion.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/rowset/rowset_reader.h"
@@ -166,6 +167,7 @@ public:
 
         // used for compaction to record row ids
         bool record_rowids = false;
+        RowIdConversion* rowid_conversion;
         std::vector<int> topn_filter_source_node_ids;
         int topn_filter_target_node_id = -1;
         // used for special optimization for query : ORDER BY key LIMIT n
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 9d79b51975c..c46ff330f2b 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -72,54 +72,31 @@ Status BlockReader::next_block_with_aggregation(Block* 
block, bool* eof) {
     return res;
 }
 
-bool BlockReader::_rowsets_overlapping(const ReaderParams& read_params) {
-    std::string cur_max_key;
+bool BlockReader::_rowsets_mono_asc_disjoint(const ReaderParams& read_params) {
+    std::string cur_rs_last_key;
     const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits;
     for (const auto& rs_split : rs_splits) {
-        // version 0-1 of every tablet is empty, just skip this rowset
-        if (rs_split.rs_reader->rowset()->version().second == 1) {
-            continue;
-        }
         if (rs_split.rs_reader->rowset()->num_rows() == 0) {
             continue;
         }
         if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
             return true;
         }
-        std::string min_key;
-        bool has_min_key = rs_split.rs_reader->rowset()->min_key(&min_key);
-        if (!has_min_key) {
+        std::string rs_first_key;
+        bool has_first_key = 
rs_split.rs_reader->rowset()->first_key(&rs_first_key);
+        if (!has_first_key) {
             return true;
         }
-        if (min_key <= cur_max_key) {
+        if (rs_first_key <= cur_rs_last_key) {
             return true;
         }
-        CHECK(rs_split.rs_reader->rowset()->max_key(&cur_max_key));
+        bool has_last_key = 
rs_split.rs_reader->rowset()->last_key(&cur_rs_last_key);
+        CHECK(has_last_key);
     }
 
-    for (const auto& rs_reader : rs_splits) {
-        // version 0-1 of every tablet is empty, just skip this rowset
-        if (rs_reader.rs_reader->rowset()->version().second == 1) {
-            continue;
-        }
-        if (rs_reader.rs_reader->rowset()->num_rows() == 0) {
-            continue;
-        }
-        if (rs_reader.rs_reader->rowset()->is_segments_overlapping()) {
-            return true;
-        }
-        std::string min_key;
-        bool has_min_key = rs_reader.rs_reader->rowset()->min_key(&min_key);
-        if (!has_min_key) {
-            return true;
-        }
-        if (min_key <= cur_max_key) {
-            return true;
-        }
-        CHECK(rs_reader.rs_reader->rowset()->max_key(&cur_max_key));
-    }
     return false;
 }
+
 Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
     auto res = _capture_rs_readers(read_params);
     if (!res.ok()) {
@@ -131,7 +108,7 @@ Status BlockReader::_init_collect_iter(const ReaderParams& 
read_params) {
         return res;
     }
     // check if rowsets are noneoverlapping
-    _is_rowsets_overlapping = _rowsets_overlapping(read_params);
+    _is_rowsets_overlapping = _rowsets_mono_asc_disjoint(read_params);
     _vcollect_iter.init(this, _is_rowsets_overlapping, 
read_params.read_orderby_key,
                         read_params.read_orderby_key_reverse);
 
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index 6f9792929db..f33fe743109 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -86,7 +86,8 @@ private:
 
     bool _get_next_row_same();
 
-    bool _rowsets_overlapping(const ReaderParams& read_params);
+    // return true if keys of rowsets are mono ascending and disjoint
+    bool _rowsets_mono_asc_disjoint(const ReaderParams& read_params);
 
     VCollectIterator _vcollect_iter;
     IteratorRowRef _next_row {{}, -1, false};
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index f7017a058df..8c910656837 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -499,6 +499,7 @@ Status 
VCollectIterator::Level0Iterator::refresh_current_row() {
         if (_block == nullptr && !_get_data_by_ref) {
             _block = std::make_shared<Block>(_schema.create_block(
                     _reader->_return_columns, 
_reader->_tablet_columns_convert_to_null_set));
+            _ref.block = _block;
         }
 
         if (!_is_empty() && _current_valid()) {
diff --git a/be/test/olap/rowid_conversion_test.cpp 
b/be/test/olap/rowid_conversion_test.cpp
index 5ae80398afb..d48d4150ad3 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -447,7 +447,12 @@ protected:
                     int64_t c1 = j * rows_per_segment + n;
                     // There are 500 rows of data overlap between rowsets
                     if (i > 0) {
-                        c1 += i * num_segments * rows_per_segment - 500;
+                        if (is_overlap) {
+                            // There are 500 rows of data overlap between 
rowsets
+                            c1 -= 500;
+                        } else {
+                            ++c1;
+                        }
                     }
                     if (is_overlap && j > 0) {
                         // There are 10 rows of data overlap between segments


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to