This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 7b56b3394be branch-4.1: [opt](memory) optimize row-store memtable 
flush memory in the row-store scenario (#64057)
7b56b3394be is described below

commit 7b56b3394be9c43afa7ddbe1260eddd3a8f3f4a7
Author: hui lai <[email protected]>
AuthorDate: Wed Jun 3 19:26:59 2026 +0800

    branch-4.1: [opt](memory) optimize row-store memtable flush memory in the 
row-store scenario (#64057)
    
    pick https://github.com/apache/doris/pull/63342
---
 be/src/storage/segment/vertical_segment_writer.cpp | 121 ++++++++++++++-------
 be/src/storage/segment/vertical_segment_writer.h   |   4 +-
 be/src/util/jsonb/serialize.cpp                    |  28 ++++-
 be/src/util/jsonb/serialize.h                      |   8 ++
 4 files changed, 120 insertions(+), 41 deletions(-)

diff --git a/be/src/storage/segment/vertical_segment_writer.cpp 
b/be/src/storage/segment/vertical_segment_writer.cpp
index a95dc380c55..670119dcba9 100644
--- a/be/src/storage/segment/vertical_segment_writer.cpp
+++ b/be/src/storage/segment/vertical_segment_writer.cpp
@@ -22,6 +22,7 @@
 #include <gen_cpp/segment_v2.pb.h>
 #include <parallel_hashmap/phmap.h>
 
+#include <algorithm>
 #include <cassert>
 #include <memory>
 #include <ostream>
@@ -40,6 +41,7 @@
 #include "core/block/block.h"
 #include "core/block/column_with_type_and_name.h"
 #include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
 #include "core/column/column_vector.h"
 #include "core/data_type/data_type.h"
 #include "core/data_type/data_type_factory.hpp"
@@ -364,31 +366,43 @@ void 
VerticalSegmentWriter::_maybe_invalid_row_cache(const std::string& key) con
     }
 }
 
-void VerticalSegmentWriter::_serialize_block_to_row_column(const Block& block) 
{
-    if (block.rows() == 0) {
-        return;
+Status VerticalSegmentWriter::_append_row_store_column(const Block& block, 
size_t row_pos,
+                                                       size_t num_rows, 
uint32_t cid) {
+    DCHECK(_tablet_schema->column(cid).is_row_store_column());
+    if (num_rows == 0) {
+        return Status::OK();
     }
-    MonotonicStopWatch watch;
-    watch.start();
-    int row_column_id = 0;
-    for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
-        if (_tablet_schema->column(i).is_row_store_column()) {
-            auto* row_store_column = static_cast<ColumnString*>(
-                    
block.get_by_position(i).column->assume_mutable_ref().assume_mutable().get());
-            row_store_column->clear();
-            DataTypeSerDeSPtrs serdes = 
create_data_type_serdes(block.get_data_types());
-            std::unordered_set<int> 
row_store_cids_set(_tablet_schema->row_columns_uids().begin(),
-                                                       
_tablet_schema->row_columns_uids().end());
-            JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, 
*row_store_column,
-                                               
cast_set<int>(_tablet_schema->num_columns()), serdes,
-                                               row_store_cids_set);
-            break;
-        }
+    DCHECK_LE(row_pos + num_rows, block.rows());
+
+    auto serdes = create_data_type_serdes(block.get_data_types());
+    std::unordered_set<int32_t> 
row_store_cids_set(_tablet_schema->row_columns_uids().begin(),
+                                                   
_tablet_schema->row_columns_uids().end());
+    size_t end_pos = row_pos + num_rows;
+    size_t batch_rows = _opts.num_rows_per_block;
+    static constexpr size_t kRowStoreBatchBytes = 4 * 1024 * 1024;
+    DCHECK_GT(batch_rows, 0);
+    for (size_t pos = row_pos; pos < end_pos;) {
+        size_t max_rows = std::min(batch_rows, end_pos - pos);
+        auto row_column = ColumnString::create();
+        auto* row_store_column = row_column.get();
+        size_t rows = JsonbSerializeUtil::block_to_jsonb(
+                *_tablet_schema, block, *row_store_column,
+                cast_set<int>(_tablet_schema->num_columns()), serdes, 
row_store_cids_set, pos,
+                max_rows, kRowStoreBatchBytes);
+        DCHECK_GT(rows, 0);
+
+        auto typed_column = block.get_by_position(cid);
+        typed_column.column = std::move(row_column);
+        
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+                typed_column, 0, rows, cid));
+        auto [status, column] = _olap_data_convertor->convert_column_data(cid);
+        RETURN_IF_ERROR(status);
+        RETURN_IF_ERROR(
+                _column_writers[cid]->append(column->get_nullmap(), 
column->get_data(), rows));
+        _olap_data_convertor->clear_source_content(cid);
+        pos += rows;
     }
-
-    VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", 
row_column_id:" << row_column_id
-               << ", total_byte_size:" << block.allocated_bytes() << ", 
serialize_cost(us)"
-               << watch.elapsed_time() / 1000;
+    return Status::OK();
 }
 
 Status VerticalSegmentWriter::_probe_key_for_mow(
@@ -456,6 +470,15 @@ Status VerticalSegmentWriter::_probe_key_for_mow(
     return Status::OK();
 }
 
+Status VerticalSegmentWriter::_check_column_writer_disk_capacity(size_t cid) {
+    if (_data_dir != nullptr &&
+        
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
+        return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed 
capacity limit.",
+                                                        
_data_dir->path_hash());
+    }
+    return Status::OK();
+}
+
 Status VerticalSegmentWriter::_finalize_column_writer_and_update_meta(size_t 
cid) {
     RETURN_IF_ERROR(_column_writers[cid]->finish());
     RETURN_IF_ERROR(_column_writers[cid]->write_data());
@@ -648,14 +671,15 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
                 full_block, *_tablet_schema, 
_opts.rowset_ctx->partial_update_info->missing_cids));
     }
 
-    // row column should be filled here
-    // convert block to row store format
-    _serialize_block_to_row_column(full_block);
-
     // convert missing columns and send to column writer
     const auto& missing_cids = 
_opts.rowset_ctx->partial_update_info->missing_cids;
     for (auto cid : missing_cids) {
         RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid), _tablet_schema));
+        if (_tablet_schema->column(cid).is_row_store_column()) {
+            RETURN_IF_ERROR(_append_row_store_column(full_block, data.row_pos, 
data.num_rows, cid));
+            RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
+            continue;
+        }
         
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
                 &full_block, data.row_pos, data.num_rows, 
std::vector<uint32_t> {cid}));
         auto [status, column] = _olap_data_convertor->convert_column_data(cid);
@@ -813,7 +837,16 @@ Status 
VerticalSegmentWriter::_append_block_with_flexible_partial_content(RowsIn
     // this column is not needed in read path for merge-on-write table
 
     // 7. fill row store column
-    _serialize_block_to_row_column(full_block);
+    for (auto cid = _tablet_schema->num_key_columns(); cid < 
_tablet_schema->num_columns(); cid++) {
+        if (!_tablet_schema->column(cid).is_row_store_column()) {
+            continue;
+        }
+        RETURN_IF_ERROR(_create_column_writer(cast_set<uint32_t>(cid), 
_tablet_schema->column(cid),
+                                              _tablet_schema));
+        RETURN_IF_ERROR(_append_row_store_column(full_block, data.row_pos, 
data.num_rows,
+                                                 cast_set<uint32_t>(cid)));
+        RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
+    }
 
     std::vector<uint32_t> column_ids;
     for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
@@ -827,6 +860,9 @@ Status 
VerticalSegmentWriter::_append_block_with_flexible_partial_content(RowsIn
 
     // 8. encode and write all non-primary key columns(including sequence 
column if exists)
     for (auto cid = _tablet_schema->num_key_columns(); cid < 
_tablet_schema->num_columns(); cid++) {
+        if (_tablet_schema->column(cid).is_row_store_column()) {
+            continue;
+        }
         if (cid != _tablet_schema->sequence_col_idx()) {
             RETURN_IF_ERROR(_create_column_writer(cast_set<uint32_t>(cid),
                                                   _tablet_schema->column(cid), 
_tablet_schema));
@@ -1007,11 +1043,21 @@ Status VerticalSegmentWriter::write_batch() {
     }
     // Row column should be filled here when it's a directly write from 
memtable
     // or it's schema change write(since column data type maybe changed, so we 
should reubild)
-    if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
-        _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
-        for (auto& data : _batched_blocks) {
-            // TODO: maybe we should pass range to this method
-            _serialize_block_to_row_column(*data.block);
+    bool should_write_row_store_column = _opts.write_type == 
DataWriteType::TYPE_DIRECT ||
+                                         _opts.write_type == 
DataWriteType::TYPE_SCHEMA_CHANGE;
+    if (should_write_row_store_column) {
+        for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+            if (!_tablet_schema->column(cid).is_row_store_column()) {
+                continue;
+            }
+            RETURN_IF_ERROR(
+                    _create_column_writer(cid, _tablet_schema->column(cid), 
_tablet_schema));
+            for (auto& data : _batched_blocks) {
+                RETURN_IF_ERROR(
+                        _append_row_store_column(*data.block, data.row_pos, 
data.num_rows, cid));
+            }
+            RETURN_IF_ERROR(_check_column_writer_disk_capacity(cid));
+            RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
         }
     }
 
@@ -1032,6 +1078,9 @@ Status VerticalSegmentWriter::write_batch() {
     // the key is cluster key column unique id
     std::map<uint32_t, IOlapColumnDataAccessor*> cid_to_column;
     for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+        if (should_write_row_store_column && 
_tablet_schema->column(cid).is_row_store_column()) {
+            continue;
+        }
         RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid), _tablet_schema));
         for (auto& data : _batched_blocks) {
             
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
@@ -1059,11 +1108,7 @@ Status VerticalSegmentWriter::write_batch() {
                                                          data.num_rows));
             _olap_data_convertor->clear_source_content();
         }
-        if (_data_dir != nullptr &&
-            
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
-            return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed 
capacity limit.",
-                                                            
_data_dir->path_hash());
-        }
+        RETURN_IF_ERROR(_check_column_writer_disk_capacity(cid));
         RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
     }
 
diff --git a/be/src/storage/segment/vertical_segment_writer.h 
b/be/src/storage/segment/vertical_segment_writer.h
index 5c0ec0930e5..8aa8a24a7c4 100644
--- a/be/src/storage/segment/vertical_segment_writer.h
+++ b/be/src/storage/segment/vertical_segment_writer.h
@@ -158,7 +158,8 @@ private:
     void _set_min_max_key(const Slice& key);
     void _set_min_key(const Slice& key);
     void _set_max_key(const Slice& key);
-    void _serialize_block_to_row_column(const Block& block);
+    Status _append_row_store_column(const Block& block, size_t row_pos, size_t 
num_rows,
+                                    uint32_t cid);
     Status _probe_key_for_mow(std::string key, std::size_t segment_pos, bool 
have_input_seq_column,
                               bool have_delete_sign,
                               const std::vector<RowsetSharedPtr>& 
specified_rowsets,
@@ -194,6 +195,7 @@ private:
             IOlapColumnDataAccessor* seq_column, size_t num_rows, bool 
need_sort);
     Status _generate_short_key_index(std::vector<IOlapColumnDataAccessor*>& 
key_columns,
                                      size_t num_rows, const 
std::vector<size_t>& short_key_pos);
+    Status _check_column_writer_disk_capacity(size_t cid);
     Status _finalize_column_writer_and_update_meta(size_t cid);
 
     bool _is_mow();
diff --git a/be/src/util/jsonb/serialize.cpp b/be/src/util/jsonb/serialize.cpp
index 0088c6249f0..254123bd31a 100644
--- a/be/src/util/jsonb/serialize.cpp
+++ b/be/src/util/jsonb/serialize.cpp
@@ -20,6 +20,7 @@
 #include <assert.h>
 
 #include <algorithm>
+#include <limits>
 #include <memory>
 #include <unordered_set>
 #include <vector>
@@ -46,13 +47,31 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& 
schema, const Block&
                                         ColumnString& dst, int num_cols,
                                         const DataTypeSerDeSPtrs& serdes,
                                         const std::unordered_set<int32_t>& 
row_store_cids) {
-    auto num_rows = block.rows();
+    block_to_jsonb(schema, block, dst, num_cols, serdes, row_store_cids, 0, 
block.rows());
+}
+
+void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const 
Block& block,
+                                        ColumnString& dst, int num_cols,
+                                        const DataTypeSerDeSPtrs& serdes,
+                                        const std::unordered_set<int32_t>& 
row_store_cids,
+                                        size_t row_pos, size_t num_rows) {
+    static_cast<void>(block_to_jsonb(schema, block, dst, num_cols, serdes, 
row_store_cids, row_pos,
+                                     num_rows, 
std::numeric_limits<size_t>::max()));
+}
+
+size_t JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const 
Block& block,
+                                          ColumnString& dst, int num_cols,
+                                          const DataTypeSerDeSPtrs& serdes,
+                                          const std::unordered_set<int32_t>& 
row_store_cids,
+                                          size_t row_pos, size_t num_rows, 
size_t max_bytes) {
     Arena arena;
     assert(num_cols <= block.columns());
+    assert(row_pos + num_rows <= block.rows());
     DataTypeSerDe::FormatOptions options;
     auto tz = cctz::utc_time_zone();
     options.timezone = &tz;
-    for (int i = 0; i < num_rows; ++i) {
+    size_t written_rows = 0;
+    for (size_t i = row_pos; i < row_pos + num_rows; ++i) {
         JsonbWriterT<JsonbOutStream> jsonb_writer;
         jsonb_writer.writeStartObject();
         for (int j = 0; j < num_cols; ++j) {
@@ -70,7 +89,12 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& 
schema, const Block&
         }
         jsonb_writer.writeEndObject();
         dst.insert_data(jsonb_writer.getOutput()->getBuffer(), 
jsonb_writer.getOutput()->getSize());
+        ++written_rows;
+        if (dst.byte_size() >= max_bytes) {
+            break;
+        }
     }
+    return written_rows;
 }
 
 // batch rows
diff --git a/be/src/util/jsonb/serialize.h b/be/src/util/jsonb/serialize.h
index f19474abe93..3c33373936e 100644
--- a/be/src/util/jsonb/serialize.h
+++ b/be/src/util/jsonb/serialize.h
@@ -41,6 +41,14 @@ public:
     static void block_to_jsonb(const TabletSchema& schema, const Block& block, 
ColumnString& dst,
                                int num_cols, const DataTypeSerDeSPtrs& serdes,
                                const std::unordered_set<int32_t>& 
row_store_cids);
+    static void block_to_jsonb(const TabletSchema& schema, const Block& block, 
ColumnString& dst,
+                               int num_cols, const DataTypeSerDeSPtrs& serdes,
+                               const std::unordered_set<int32_t>& 
row_store_cids, size_t row_pos,
+                               size_t num_rows);
+    static size_t block_to_jsonb(const TabletSchema& schema, const Block& 
block, ColumnString& dst,
+                                 int num_cols, const DataTypeSerDeSPtrs& 
serdes,
+                                 const std::unordered_set<int32_t>& 
row_store_cids, size_t row_pos,
+                                 size_t num_rows, size_t max_bytes);
     // batch rows
     static Status jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const 
ColumnString& jsonb_column,
                                  const std::unordered_map<uint32_t, uint32_t>& 
col_id_to_idx,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to