yixiutt commented on code in PR #16731:
URL: https://github.com/apache/doris/pull/16731#discussion_r1113903511


##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -133,18 +137,21 @@ RowwiseIteratorUPtr 
BetaRowsetWriter::_get_segcompaction_reader(
         }
         seg_iterators.push_back(std::move(iter));
     }
-    bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
-    bool is_reverse = false;
-    auto merge_itr = vectorized::new_merge_iterator(std::move(seg_iterators), 
-1, is_unique,
-                                                    is_reverse, 
merged_row_stat);
-    DCHECK(merge_itr);
-    auto s = merge_itr->init(read_options);
-    if (!s.ok()) {
-        LOG(WARNING) << "failed to init iterator: " << s.to_string();
-        return nullptr;
-    }
 
-    return merge_itr;
+    auto reader = std::unique_ptr<vectorized::VerticalBlockReader> {
+            new vectorized::VerticalBlockReader(&row_sources_buf)};
+
+    TabletReader::ReaderParams reader_params;
+    reader_params.is_segcompaction = true;
+    reader_params.segment_iters_ptr = &seg_iterators;
+    // no reader_params.version shouldn't break segcompaction
+    reader_params.tablet_schema = _context.tablet_schema;
+    reader_params.tablet = tablet;
+    reader_params.return_columns = return_columns;
+    reader_params.is_key_column_group = is_key;
+    reader->init(reader_params);
+
+    return reader;

Review Comment:
   better return status



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -116,9 +118,11 @@ Status BetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _add_block(block, &_segment_writer);
 }
 
-RowwiseIteratorUPtr BetaRowsetWriter::_get_segcompaction_reader(
-        SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> 
schema,
-        OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
+std::unique_ptr<vectorized::VerticalBlockReader> 
BetaRowsetWriter::_get_segcompaction_reader(
+        SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet,
+        std::shared_ptr<Schema> schema, OlapReaderStatistics* stat, uint64_t* 
merged_row_stat,
+        vectorized::RowSourcesBuffer& row_sources_buf, bool is_key,
+        std::vector<uint32_t>& return_columns) {

Review Comment:
   It's weired to get reader in writer



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -267,49 +274,69 @@ Status 
BetaRowsetWriter::_do_compact_segments(SegCompactionCandidatesSharedPtr s
         LOG(WARNING) << "skip segcompaction due to memory shortage";
         return Status::Error<FETCH_MEMORY_EXCEEDED>();
     }
+
     uint64_t begin = (*(segments->begin()))->id();
     uint64_t end = (*(segments->end() - 1))->id();
     uint64_t begin_time = GetCurrentTimeMicros();
 
-    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
-                                           
_context.tablet_schema->columns().size());
     std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
     uint64_t merged_row_stat = 0;
-    auto reader_ptr = _get_segcompaction_reader(segments, schema, stat.get(), 
&merged_row_stat);
-    if (UNLIKELY(reader_ptr == nullptr)) {
-        LOG(WARNING) << "failed to get segcompaction reader";
-        return Status::Error<SEGCOMPACTION_INIT_READER>();
-    }
+    uint64_t index_size = 0;
+    uint64_t total_index_size = 0;
+
+    // ================ begin vcompaction ==================
     auto writer = _create_segcompaction_writer(begin, end);
     if (UNLIKELY(writer == nullptr)) {
         LOG(WARNING) << "failed to get segcompaction writer";
         return Status::Error<SEGCOMPACTION_INIT_WRITER>();
     }
-    uint64_t row_count = 0;
-    vectorized::Block block = _context.tablet_schema->create_block();
-    while (true) {
-        auto status = reader_ptr->next_batch(&block);
-        row_count += block.rows();
-        if (status != Status::OK()) {
-            if (LIKELY(status.is<END_OF_FILE>())) {
-                RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, 
&writer),
-                                  "write block failed");
-                break;
-            } else {
-                LOG(WARNING) << "read block failed: " << status.to_string();
-                return status;
-            }
+

Review Comment:
   better to move this logic out



##########
be/src/olap/merger.cpp:
##########
@@ -112,7 +110,7 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, 
ReaderType reader_type,
     }
 
     RETURN_NOT_OK_LOG(
-            dst_rowset_writer->flush(),
+            dst_rowset_writer->flush_by_merger(),

Review Comment:
   flush not ok?



##########
be/src/vec/olap/vertical_block_reader.cpp:
##########
@@ -86,9 +86,25 @@ Status VerticalBlockReader::_init_collect_iter(const 
ReaderParams& read_params)
     std::vector<RowwiseIteratorUPtr> segment_iters;
     std::vector<bool> iterator_init_flag;
     std::vector<RowsetId> rowset_ids;
-    RETURN_IF_ERROR(
-            _get_segment_iterators(read_params, &segment_iters, 
&iterator_init_flag, &rowset_ids));
-    CHECK(segment_iters.size() == iterator_init_flag.size());
+    std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = 
read_params.segment_iters_ptr;
+
+    if (!segment_iters_ptr) {
+        RETURN_IF_ERROR(_get_segment_iterators(read_params, &segment_iters, 
&iterator_init_flag,
+                                               &rowset_ids));
+        CHECK(segment_iters.size() == iterator_init_flag.size());
+        segment_iters_ptr = &segment_iters;

Review Comment:
   pass segment_iters_ptr to _get_segment_iterators?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -267,49 +274,69 @@ Status 
BetaRowsetWriter::_do_compact_segments(SegCompactionCandidatesSharedPtr s
         LOG(WARNING) << "skip segcompaction due to memory shortage";
         return Status::Error<FETCH_MEMORY_EXCEEDED>();
     }
+
     uint64_t begin = (*(segments->begin()))->id();
     uint64_t end = (*(segments->end() - 1))->id();
     uint64_t begin_time = GetCurrentTimeMicros();
 
-    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
-                                           
_context.tablet_schema->columns().size());
     std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
     uint64_t merged_row_stat = 0;
-    auto reader_ptr = _get_segcompaction_reader(segments, schema, stat.get(), 
&merged_row_stat);
-    if (UNLIKELY(reader_ptr == nullptr)) {
-        LOG(WARNING) << "failed to get segcompaction reader";
-        return Status::Error<SEGCOMPACTION_INIT_READER>();
-    }
+    uint64_t index_size = 0;
+    uint64_t total_index_size = 0;
+
+    // ================ begin vcompaction ==================
     auto writer = _create_segcompaction_writer(begin, end);
     if (UNLIKELY(writer == nullptr)) {
         LOG(WARNING) << "failed to get segcompaction writer";
         return Status::Error<SEGCOMPACTION_INIT_WRITER>();
     }
-    uint64_t row_count = 0;
-    vectorized::Block block = _context.tablet_schema->create_block();
-    while (true) {
-        auto status = reader_ptr->next_batch(&block);
-        row_count += block.rows();
-        if (status != Status::OK()) {
-            if (LIKELY(status.is<END_OF_FILE>())) {
-                RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, 
&writer),
-                                  "write block failed");
-                break;
-            } else {
-                LOG(WARNING) << "read block failed: " << status.to_string();
-                return status;
-            }
+
+    DCHECK(_context.tablet);
+    auto tablet = _context.tablet;
+
+    std::vector<std::vector<uint32_t>> column_groups;
+    Merger::vertical_split_columns(_context.tablet_schema, &column_groups);
+    vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), 
tablet->tablet_path(),
+                                                 READER_SEGMENT_COMPACTION);
+
+    KeyBoundsPB key_bounds;
+    // compact group one by one
+    for (auto i = 0; i < column_groups.size(); ++i) {
+        VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
+        bool is_key = (i == 0);
+        std::vector<uint32_t> column_ids = column_groups[i];
+
+        writer->clear();
+        writer->init(column_ids, is_key);
+        auto schema = 
std::make_shared<Schema>(_context.tablet_schema->columns(), column_ids);
+        auto reader =
+                _get_segcompaction_reader(segments, tablet, schema, 
stat.get(), &merged_row_stat,
+                                          row_sources_buf, is_key, column_ids);
+        if (UNLIKELY(reader == nullptr)) {
+            LOG(WARNING) << "failed to get segcompaction reader";
+            return Status::Error<SEGCOMPACTION_INIT_READER>();
+        }
+
+        // ========= Merger Compaction
+        Merger::Statistics stats;

Review Comment:
   stats no use?



##########
be/src/olap/rowset/beta_rowset_writer.cpp:
##########
@@ -267,49 +274,69 @@ Status 
BetaRowsetWriter::_do_compact_segments(SegCompactionCandidatesSharedPtr s
         LOG(WARNING) << "skip segcompaction due to memory shortage";
         return Status::Error<FETCH_MEMORY_EXCEEDED>();
     }
+
     uint64_t begin = (*(segments->begin()))->id();
     uint64_t end = (*(segments->end() - 1))->id();
     uint64_t begin_time = GetCurrentTimeMicros();
 
-    auto schema = std::make_shared<Schema>(_context.tablet_schema->columns(),
-                                           
_context.tablet_schema->columns().size());
     std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
     uint64_t merged_row_stat = 0;
-    auto reader_ptr = _get_segcompaction_reader(segments, schema, stat.get(), 
&merged_row_stat);
-    if (UNLIKELY(reader_ptr == nullptr)) {
-        LOG(WARNING) << "failed to get segcompaction reader";
-        return Status::Error<SEGCOMPACTION_INIT_READER>();
-    }
+    uint64_t index_size = 0;
+    uint64_t total_index_size = 0;
+
+    // ================ begin vcompaction ==================
     auto writer = _create_segcompaction_writer(begin, end);
     if (UNLIKELY(writer == nullptr)) {
         LOG(WARNING) << "failed to get segcompaction writer";
         return Status::Error<SEGCOMPACTION_INIT_WRITER>();
     }
-    uint64_t row_count = 0;
-    vectorized::Block block = _context.tablet_schema->create_block();
-    while (true) {
-        auto status = reader_ptr->next_batch(&block);
-        row_count += block.rows();
-        if (status != Status::OK()) {
-            if (LIKELY(status.is<END_OF_FILE>())) {
-                RETURN_NOT_OK_LOG(_add_block_for_segcompaction(&block, 
&writer),
-                                  "write block failed");
-                break;
-            } else {
-                LOG(WARNING) << "read block failed: " << status.to_string();
-                return status;
-            }
+
+    DCHECK(_context.tablet);
+    auto tablet = _context.tablet;
+
+    std::vector<std::vector<uint32_t>> column_groups;
+    Merger::vertical_split_columns(_context.tablet_schema, &column_groups);
+    vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), 
tablet->tablet_path(),
+                                                 READER_SEGMENT_COMPACTION);
+
+    KeyBoundsPB key_bounds;
+    // compact group one by one
+    for (auto i = 0; i < column_groups.size(); ++i) {
+        VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
+        bool is_key = (i == 0);
+        std::vector<uint32_t> column_ids = column_groups[i];
+
+        writer->clear();
+        writer->init(column_ids, is_key);
+        auto schema = 
std::make_shared<Schema>(_context.tablet_schema->columns(), column_ids);
+        auto reader =
+                _get_segcompaction_reader(segments, tablet, schema, 
stat.get(), &merged_row_stat,
+                                          row_sources_buf, is_key, column_ids);
+        if (UNLIKELY(reader == nullptr)) {
+            LOG(WARNING) << "failed to get segcompaction reader";
+            return Status::Error<SEGCOMPACTION_INIT_READER>();
+        }
+
+        // ========= Merger Compaction
+        Merger::Statistics stats;
+
+        RETURN_IF_ERROR(Merger::vertical_compact_one_group(
+                tablet, READER_SEGMENT_COMPACTION, _context.tablet_schema, 
is_key, column_ids,
+                &row_sources_buf, *reader, *writer, INT_MAX, &stats, 
&index_size, key_bounds));

Review Comment:
   if segment compaction only produce one dest segment, dest segment may be too 
large



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to