This is an automated email from the ASF dual-hosted git repository.

liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ebf7b4d58c [opt](memory) optimize row-store memtable flush memory in 
the row-store scenario (#63342)
0ebf7b4d58c is described below

commit 0ebf7b4d58cd414edf6bec0a6480281263145a73
Author: hui lai <[email protected]>
AuthorDate: Tue May 26 15:34:54 2026 +0800

    [opt](memory) optimize row-store memtable flush memory in the row-store 
scenario (#63342)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    Row-store flush previously serialized the full block into the row-store
    column before writing column data. When
    `enable_adaptive_write_buffer_size` is enabled, a memtable may grow up
    to several times `write_buffer_size`, which further amplifies the
    temporary row-store `ColumnString` allocated during flush. For row-store
    tables, this can cause much higher flush-time peak memory than
    non-adaptive mode. This PR serializes row-store data by range and
    appends it directly to the row-store column writer in smaller batches,
    avoiding full-block row-store materialization.
    
    ### Solution
    
    This PR adds range-based row-store serialization in
    `JsonbSerializeUtil::block_to_jsonb`. The new API supports `row_pos` and
    `num_rows`, and also supports a serialized-size limit so the caller can
    stop a range once the temporary row-store `ColumnString` reaches a
    target size. The existing API is kept as a wrapper for compatibility.
    
    In `VerticalSegmentWriter`, the full-block
    `_serialize_block_to_row_column()` path is replaced by
    `_append_row_store_column()`. The new helper:
    - Splits row-store serialization by `_opts.num_rows_per_block`
    - Also limits each temporary row-store `ColumnString` to about `4MB`
    - Always allows at least one row per batch, so a single row larger than
    `4MB` will be written as its own batch
    - Creates a temporary `ColumnString` only for the current range
    - Serializes only that range into row-store format
    - Immediately appends the temporary column to the row-store column
    writer
    - Clears convertor source content after append
    
    The direct/schema-change write path, fixed partial update path, and
    flexible partial update path now all use this helper for row-store
    columns. Normal column writing skips row-store columns to avoid
    duplicate writes.
    
    With this change, row-store flush temporary memory is bounded by both
    row count and serialized byte size, instead of the full memtable/block
    size. This reduces peak memory especially when adaptive write buffer
    size is enabled or when individual rows are large.
---
 be/src/storage/segment/vertical_segment_writer.cpp | 121 ++++++++++++++-------
 be/src/storage/segment/vertical_segment_writer.h   |   4 +-
 be/src/util/jsonb/serialize.cpp                    |  28 ++++-
 be/src/util/jsonb/serialize.h                      |   8 ++
 4 files changed, 120 insertions(+), 41 deletions(-)

diff --git a/be/src/storage/segment/vertical_segment_writer.cpp 
b/be/src/storage/segment/vertical_segment_writer.cpp
index 68fa8acd1ed..97821a7d1a4 100644
--- a/be/src/storage/segment/vertical_segment_writer.cpp
+++ b/be/src/storage/segment/vertical_segment_writer.cpp
@@ -22,6 +22,7 @@
 #include <gen_cpp/segment_v2.pb.h>
 #include <parallel_hashmap/phmap.h>
 
+#include <algorithm>
 #include <cassert>
 #include <memory>
 #include <ostream>
@@ -40,6 +41,7 @@
 #include "core/block/block.h"
 #include "core/block/column_with_type_and_name.h"
 #include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
 #include "core/column/column_vector.h"
 #include "core/data_type/data_type.h"
 #include "core/data_type/data_type_factory.hpp"
@@ -370,31 +372,43 @@ void 
VerticalSegmentWriter::_maybe_invalid_row_cache(const std::string& key) con
     }
 }
 
-void VerticalSegmentWriter::_serialize_block_to_row_column(Block& block) {
-    if (block.rows() == 0) {
-        return;
+Status VerticalSegmentWriter::_append_row_store_column(const Block& block, 
size_t row_pos,
+                                                       size_t num_rows, 
uint32_t cid) {
+    DCHECK(_tablet_schema->column(cid).is_row_store_column());
+    if (num_rows == 0) {
+        return Status::OK();
     }
-    MonotonicStopWatch watch;
-    watch.start();
-    int row_column_id = 0;
-    for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
-        if (_tablet_schema->column(i).is_row_store_column()) {
-            auto row_store_column_ptr = 
block.get_by_position(i).column->clone_empty();
-            auto* row_store_column = 
static_cast<ColumnString*>(row_store_column_ptr.get());
-            DataTypeSerDeSPtrs serdes = 
create_data_type_serdes(block.get_data_types());
-            std::unordered_set<int> 
row_store_cids_set(_tablet_schema->row_columns_uids().begin(),
-                                                       
_tablet_schema->row_columns_uids().end());
-            JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, 
*row_store_column,
-                                               
cast_set<int>(_tablet_schema->num_columns()), serdes,
-                                               row_store_cids_set);
-            block.replace_by_position(i, std::move(row_store_column_ptr));
-            break;
-        }
+    DCHECK_LE(row_pos + num_rows, block.rows());
+
+    auto serdes = create_data_type_serdes(block.get_data_types());
+    std::unordered_set<int32_t> 
row_store_cids_set(_tablet_schema->row_columns_uids().begin(),
+                                                   
_tablet_schema->row_columns_uids().end());
+    size_t end_pos = row_pos + num_rows;
+    size_t batch_rows = _opts.num_rows_per_block;
+    static constexpr size_t kRowStoreBatchBytes = 4 * 1024 * 1024;
+    DCHECK_GT(batch_rows, 0);
+    for (size_t pos = row_pos; pos < end_pos;) {
+        size_t max_rows = std::min(batch_rows, end_pos - pos);
+        auto row_column = ColumnString::create();
+        auto* row_store_column = row_column.get();
+        size_t rows = JsonbSerializeUtil::block_to_jsonb(
+                *_tablet_schema, block, *row_store_column,
+                cast_set<int>(_tablet_schema->num_columns()), serdes, 
row_store_cids_set, pos,
+                max_rows, kRowStoreBatchBytes);
+        DCHECK_GT(rows, 0);
+
+        auto typed_column = block.get_by_position(cid);
+        typed_column.column = std::move(row_column);
+        
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+                typed_column, 0, rows, cid));
+        auto [status, column] = _olap_data_convertor->convert_column_data(cid);
+        RETURN_IF_ERROR(status);
+        RETURN_IF_ERROR(
+                _column_writers[cid]->append(column->get_nullmap(), 
column->get_data(), rows));
+        _olap_data_convertor->clear_source_content(cid);
+        pos += rows;
     }
-
-    VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", 
row_column_id:" << row_column_id
-               << ", total_byte_size:" << block.allocated_bytes() << ", 
serialize_cost(us)"
-               << watch.elapsed_time() / 1000;
+    return Status::OK();
 }
 
 Status VerticalSegmentWriter::_probe_key_for_mow(
@@ -462,6 +476,15 @@ Status VerticalSegmentWriter::_probe_key_for_mow(
     return Status::OK();
 }
 
+Status VerticalSegmentWriter::_check_column_writer_disk_capacity(size_t cid) {
+    if (_data_dir != nullptr &&
+        
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
+        return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed 
capacity limit.",
+                                                        
_data_dir->path_hash());
+    }
+    return Status::OK();
+}
+
 Status VerticalSegmentWriter::_finalize_column_writer_and_update_meta(size_t 
cid) {
     RETURN_IF_ERROR(_column_writers[cid]->finish());
     RETURN_IF_ERROR(_column_writers[cid]->write_data());
@@ -655,14 +678,15 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
                 full_block, *_tablet_schema, 
_opts.rowset_ctx->partial_update_info->missing_cids));
     }
 
-    // row column should be filled here
-    // convert block to row store format
-    _serialize_block_to_row_column(full_block);
-
     // convert missing columns and send to column writer
     const auto& missing_cids = 
_opts.rowset_ctx->partial_update_info->missing_cids;
     for (auto cid : missing_cids) {
         RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid), _tablet_schema));
+        if (_tablet_schema->column(cid).is_row_store_column()) {
+            RETURN_IF_ERROR(_append_row_store_column(full_block, data.row_pos, 
data.num_rows, cid));
+            RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
+            continue;
+        }
         
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
                 &full_block, data.row_pos, data.num_rows, 
std::vector<uint32_t> {cid}));
         auto [status, column] = _olap_data_convertor->convert_column_data(cid);
@@ -819,7 +843,16 @@ Status 
VerticalSegmentWriter::_append_block_with_flexible_partial_content(RowsIn
     // this column is not needed in read path for merge-on-write table
 
     // 7. fill row store column
-    _serialize_block_to_row_column(full_block);
+    for (auto cid = _tablet_schema->num_key_columns(); cid < 
_tablet_schema->num_columns(); cid++) {
+        if (!_tablet_schema->column(cid).is_row_store_column()) {
+            continue;
+        }
+        RETURN_IF_ERROR(_create_column_writer(cast_set<uint32_t>(cid), 
_tablet_schema->column(cid),
+                                              _tablet_schema));
+        RETURN_IF_ERROR(_append_row_store_column(full_block, data.row_pos, 
data.num_rows,
+                                                 cast_set<uint32_t>(cid)));
+        RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
+    }
 
     std::vector<uint32_t> column_ids;
     for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
@@ -833,6 +866,9 @@ Status 
VerticalSegmentWriter::_append_block_with_flexible_partial_content(RowsIn
 
     // 8. encode and write all non-primary key columns(including sequence 
column if exists)
     for (auto cid = _tablet_schema->num_key_columns(); cid < 
_tablet_schema->num_columns(); cid++) {
+        if (_tablet_schema->column(cid).is_row_store_column()) {
+            continue;
+        }
         if (cid != _tablet_schema->sequence_col_idx()) {
             RETURN_IF_ERROR(_create_column_writer(cast_set<uint32_t>(cid),
                                                   _tablet_schema->column(cid), 
_tablet_schema));
@@ -1013,11 +1049,21 @@ Status VerticalSegmentWriter::write_batch() {
     }
     // Row column should be filled here when it's a directly write from 
memtable
     // or it's schema change write(since column data type maybe changed, so we 
should reubild)
-    if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
-        _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
-        for (auto& data : _batched_blocks) {
-            // TODO: maybe we should pass range to this method
-            _serialize_block_to_row_column(*const_cast<Block*>(data.block));
+    bool should_write_row_store_column = _opts.write_type == 
DataWriteType::TYPE_DIRECT ||
+                                         _opts.write_type == 
DataWriteType::TYPE_SCHEMA_CHANGE;
+    if (should_write_row_store_column) {
+        for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+            if (!_tablet_schema->column(cid).is_row_store_column()) {
+                continue;
+            }
+            RETURN_IF_ERROR(
+                    _create_column_writer(cid, _tablet_schema->column(cid), 
_tablet_schema));
+            for (auto& data : _batched_blocks) {
+                RETURN_IF_ERROR(
+                        _append_row_store_column(*data.block, data.row_pos, 
data.num_rows, cid));
+            }
+            RETURN_IF_ERROR(_check_column_writer_disk_capacity(cid));
+            RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
         }
     }
 
@@ -1038,6 +1084,9 @@ Status VerticalSegmentWriter::write_batch() {
     // the key is cluster key column unique id
     std::map<uint32_t, IOlapColumnDataAccessor*> cid_to_column;
     for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+        if (should_write_row_store_column && 
_tablet_schema->column(cid).is_row_store_column()) {
+            continue;
+        }
         RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid), _tablet_schema));
         for (auto& data : _batched_blocks) {
             
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
@@ -1065,11 +1114,7 @@ Status VerticalSegmentWriter::write_batch() {
                                                          data.num_rows));
             _olap_data_convertor->clear_source_content();
         }
-        if (_data_dir != nullptr &&
-            
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
-            return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed 
capacity limit.",
-                                                            
_data_dir->path_hash());
-        }
+        RETURN_IF_ERROR(_check_column_writer_disk_capacity(cid));
         RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
     }
 
diff --git a/be/src/storage/segment/vertical_segment_writer.h 
b/be/src/storage/segment/vertical_segment_writer.h
index 39235811c07..8aa8a24a7c4 100644
--- a/be/src/storage/segment/vertical_segment_writer.h
+++ b/be/src/storage/segment/vertical_segment_writer.h
@@ -158,7 +158,8 @@ private:
     void _set_min_max_key(const Slice& key);
     void _set_min_key(const Slice& key);
     void _set_max_key(const Slice& key);
-    void _serialize_block_to_row_column(Block& block);
+    Status _append_row_store_column(const Block& block, size_t row_pos, size_t 
num_rows,
+                                    uint32_t cid);
     Status _probe_key_for_mow(std::string key, std::size_t segment_pos, bool 
have_input_seq_column,
                               bool have_delete_sign,
                               const std::vector<RowsetSharedPtr>& 
specified_rowsets,
@@ -194,6 +195,7 @@ private:
             IOlapColumnDataAccessor* seq_column, size_t num_rows, bool 
need_sort);
     Status _generate_short_key_index(std::vector<IOlapColumnDataAccessor*>& 
key_columns,
                                      size_t num_rows, const 
std::vector<size_t>& short_key_pos);
+    Status _check_column_writer_disk_capacity(size_t cid);
     Status _finalize_column_writer_and_update_meta(size_t cid);
 
     bool _is_mow();
diff --git a/be/src/util/jsonb/serialize.cpp b/be/src/util/jsonb/serialize.cpp
index 6ff4a076f89..1b449da515c 100644
--- a/be/src/util/jsonb/serialize.cpp
+++ b/be/src/util/jsonb/serialize.cpp
@@ -20,6 +20,7 @@
 #include <assert.h>
 
 #include <algorithm>
+#include <limits>
 #include <memory>
 #include <unordered_set>
 #include <vector>
@@ -47,13 +48,31 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& 
schema, const Block&
                                         ColumnString& dst, int num_cols,
                                         const DataTypeSerDeSPtrs& serdes,
                                         const std::unordered_set<int32_t>& 
row_store_cids) {
-    auto num_rows = block.rows();
+    block_to_jsonb(schema, block, dst, num_cols, serdes, row_store_cids, 0, 
block.rows());
+}
+
+void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const 
Block& block,
+                                        ColumnString& dst, int num_cols,
+                                        const DataTypeSerDeSPtrs& serdes,
+                                        const std::unordered_set<int32_t>& 
row_store_cids,
+                                        size_t row_pos, size_t num_rows) {
+    static_cast<void>(block_to_jsonb(schema, block, dst, num_cols, serdes, 
row_store_cids, row_pos,
+                                     num_rows, 
std::numeric_limits<size_t>::max()));
+}
+
+size_t JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const 
Block& block,
+                                          ColumnString& dst, int num_cols,
+                                          const DataTypeSerDeSPtrs& serdes,
+                                          const std::unordered_set<int32_t>& 
row_store_cids,
+                                          size_t row_pos, size_t num_rows, 
size_t max_bytes) {
     Arena arena;
     assert(num_cols <= block.columns());
+    assert(row_pos + num_rows <= block.rows());
     DataTypeSerDe::FormatOptions options;
     auto tz = cctz::utc_time_zone();
     options.timezone = &tz;
-    for (int i = 0; i < num_rows; ++i) {
+    size_t written_rows = 0;
+    for (size_t i = row_pos; i < row_pos + num_rows; ++i) {
         JsonbWriterT<JsonbOutStream> jsonb_writer;
         jsonb_writer.writeStartObject();
         for (int j = 0; j < num_cols; ++j) {
@@ -71,7 +90,12 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& 
schema, const Block&
         }
         jsonb_writer.writeEndObject();
         dst.insert_data(jsonb_writer.getOutput()->getBuffer(), 
jsonb_writer.getOutput()->getSize());
+        ++written_rows;
+        if (dst.byte_size() >= max_bytes) {
+            break;
+        }
     }
+    return written_rows;
 }
 
 // batch rows
diff --git a/be/src/util/jsonb/serialize.h b/be/src/util/jsonb/serialize.h
index e25ecc00af4..36627e0b5ea 100644
--- a/be/src/util/jsonb/serialize.h
+++ b/be/src/util/jsonb/serialize.h
@@ -41,6 +41,14 @@ public:
     static void block_to_jsonb(const TabletSchema& schema, const Block& block, 
ColumnString& dst,
                                int num_cols, const DataTypeSerDeSPtrs& serdes,
                                const std::unordered_set<int32_t>& 
row_store_cids);
+    static void block_to_jsonb(const TabletSchema& schema, const Block& block, 
ColumnString& dst,
+                               int num_cols, const DataTypeSerDeSPtrs& serdes,
+                               const std::unordered_set<int32_t>& 
row_store_cids, size_t row_pos,
+                               size_t num_rows);
+    static size_t block_to_jsonb(const TabletSchema& schema, const Block& 
block, ColumnString& dst,
+                                 int num_cols, const DataTypeSerDeSPtrs& 
serdes,
+                                 const std::unordered_set<int32_t>& 
row_store_cids, size_t row_pos,
+                                 size_t num_rows, size_t max_bytes);
     // batch rows
     static Status jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const 
ColumnString& jsonb_column,
                                  const std::unordered_map<uint32_t, uint32_t>& 
col_id_to_idx,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to