This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0ebf7b4d58c [opt](memory) optimize row-store memtable flush memory in
the row-store scenario (#63342)
0ebf7b4d58c is described below
commit 0ebf7b4d58cd414edf6bec0a6480281263145a73
Author: hui lai <[email protected]>
AuthorDate: Tue May 26 15:34:54 2026 +0800
[opt](memory) optimize row-store memtable flush memory in the row-store
scenario (#63342)
### What problem does this PR solve?
Problem Summary:
Row-store flush previously serialized the full block into the row-store
column before writing column data. When
`enable_adaptive_write_buffer_size` is enabled, a memtable may grow up
to several times `write_buffer_size`, which further amplifies the
temporary row-store `ColumnString` allocated during flush. For row-store
tables, this can cause much higher flush-time peak memory than
non-adaptive mode. This PR serializes row-store data by range and
appends it directly to the row-store column writer in smaller batches,
avoiding full-block row-store materialization.
### Solution
This PR adds range-based row-store serialization in
`JsonbSerializeUtil::block_to_jsonb`. The new API supports `row_pos` and
`num_rows`, and also supports a serialized-size limit so the caller can
stop a range once the temporary row-store `ColumnString` reaches a
target size. The existing API is kept as a wrapper for compatibility.
In `VerticalSegmentWriter`, the full-block
`_serialize_block_to_row_column()` path is replaced by
`_append_row_store_column()`. The new helper:
- Splits row-store serialization by `_opts.num_rows_per_block`
- Also limits each temporary row-store `ColumnString` to about `4MB`
- Always allows at least one row per batch, so a single row larger than
`4MB` will be written as its own batch
- Creates a temporary `ColumnString` only for the current range
- Serializes only that range into row-store format
- Immediately appends the temporary column to the row-store column
writer
- Clears convertor source content after append
The direct/schema-change write path, fixed partial update path, and
flexible partial update path now all use this helper for row-store
columns. Normal column writing skips row-store columns to avoid
duplicate writes.
With this change, row-store flush temporary memory is bounded by both
row count and serialized byte size, instead of the full memtable/block
size. This reduces peak memory especially when adaptive write buffer
size is enabled or when individual rows are large.
---
be/src/storage/segment/vertical_segment_writer.cpp | 121 ++++++++++++++-------
be/src/storage/segment/vertical_segment_writer.h | 4 +-
be/src/util/jsonb/serialize.cpp | 28 ++++-
be/src/util/jsonb/serialize.h | 8 ++
4 files changed, 120 insertions(+), 41 deletions(-)
diff --git a/be/src/storage/segment/vertical_segment_writer.cpp
b/be/src/storage/segment/vertical_segment_writer.cpp
index 68fa8acd1ed..97821a7d1a4 100644
--- a/be/src/storage/segment/vertical_segment_writer.cpp
+++ b/be/src/storage/segment/vertical_segment_writer.cpp
@@ -22,6 +22,7 @@
#include <gen_cpp/segment_v2.pb.h>
#include <parallel_hashmap/phmap.h>
+#include <algorithm>
#include <cassert>
#include <memory>
#include <ostream>
@@ -40,6 +41,7 @@
#include "core/block/block.h"
#include "core/block/column_with_type_and_name.h"
#include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type.h"
#include "core/data_type/data_type_factory.hpp"
@@ -370,31 +372,43 @@ void
VerticalSegmentWriter::_maybe_invalid_row_cache(const std::string& key) con
}
}
-void VerticalSegmentWriter::_serialize_block_to_row_column(Block& block) {
- if (block.rows() == 0) {
- return;
+Status VerticalSegmentWriter::_append_row_store_column(const Block& block,
size_t row_pos,
+ size_t num_rows,
uint32_t cid) {
+ DCHECK(_tablet_schema->column(cid).is_row_store_column());
+ if (num_rows == 0) {
+ return Status::OK();
}
- MonotonicStopWatch watch;
- watch.start();
- int row_column_id = 0;
- for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
- if (_tablet_schema->column(i).is_row_store_column()) {
- auto row_store_column_ptr =
block.get_by_position(i).column->clone_empty();
- auto* row_store_column =
static_cast<ColumnString*>(row_store_column_ptr.get());
- DataTypeSerDeSPtrs serdes =
create_data_type_serdes(block.get_data_types());
- std::unordered_set<int>
row_store_cids_set(_tablet_schema->row_columns_uids().begin(),
-
_tablet_schema->row_columns_uids().end());
- JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block,
*row_store_column,
-
cast_set<int>(_tablet_schema->num_columns()), serdes,
- row_store_cids_set);
- block.replace_by_position(i, std::move(row_store_column_ptr));
- break;
- }
+ DCHECK_LE(row_pos + num_rows, block.rows());
+
+ auto serdes = create_data_type_serdes(block.get_data_types());
+ std::unordered_set<int32_t>
row_store_cids_set(_tablet_schema->row_columns_uids().begin(),
+
_tablet_schema->row_columns_uids().end());
+ size_t end_pos = row_pos + num_rows;
+ size_t batch_rows = _opts.num_rows_per_block;
+ static constexpr size_t kRowStoreBatchBytes = 4 * 1024 * 1024;
+ DCHECK_GT(batch_rows, 0);
+ for (size_t pos = row_pos; pos < end_pos;) {
+ size_t max_rows = std::min(batch_rows, end_pos - pos);
+ auto row_column = ColumnString::create();
+ auto* row_store_column = row_column.get();
+ size_t rows = JsonbSerializeUtil::block_to_jsonb(
+ *_tablet_schema, block, *row_store_column,
+ cast_set<int>(_tablet_schema->num_columns()), serdes,
row_store_cids_set, pos,
+ max_rows, kRowStoreBatchBytes);
+ DCHECK_GT(rows, 0);
+
+ auto typed_column = block.get_by_position(cid);
+ typed_column.column = std::move(row_column);
+
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+ typed_column, 0, rows, cid));
+ auto [status, column] = _olap_data_convertor->convert_column_data(cid);
+ RETURN_IF_ERROR(status);
+ RETURN_IF_ERROR(
+ _column_writers[cid]->append(column->get_nullmap(),
column->get_data(), rows));
+ _olap_data_convertor->clear_source_content(cid);
+ pos += rows;
}
-
- VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ",
row_column_id:" << row_column_id
- << ", total_byte_size:" << block.allocated_bytes() << ",
serialize_cost(us)"
- << watch.elapsed_time() / 1000;
+ return Status::OK();
}
Status VerticalSegmentWriter::_probe_key_for_mow(
@@ -462,6 +476,15 @@ Status VerticalSegmentWriter::_probe_key_for_mow(
return Status::OK();
}
+Status VerticalSegmentWriter::_check_column_writer_disk_capacity(size_t cid) {
+ if (_data_dir != nullptr &&
+
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
+ return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed
capacity limit.",
+
_data_dir->path_hash());
+ }
+ return Status::OK();
+}
+
Status VerticalSegmentWriter::_finalize_column_writer_and_update_meta(size_t
cid) {
RETURN_IF_ERROR(_column_writers[cid]->finish());
RETURN_IF_ERROR(_column_writers[cid]->write_data());
@@ -655,14 +678,15 @@ Status
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
full_block, *_tablet_schema,
_opts.rowset_ctx->partial_update_info->missing_cids));
}
- // row column should be filled here
- // convert block to row store format
- _serialize_block_to_row_column(full_block);
-
// convert missing columns and send to column writer
const auto& missing_cids =
_opts.rowset_ctx->partial_update_info->missing_cids;
for (auto cid : missing_cids) {
RETURN_IF_ERROR(_create_column_writer(cid,
_tablet_schema->column(cid), _tablet_schema));
+ if (_tablet_schema->column(cid).is_row_store_column()) {
+ RETURN_IF_ERROR(_append_row_store_column(full_block, data.row_pos,
data.num_rows, cid));
+ RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
+ continue;
+ }
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
&full_block, data.row_pos, data.num_rows,
std::vector<uint32_t> {cid}));
auto [status, column] = _olap_data_convertor->convert_column_data(cid);
@@ -819,7 +843,16 @@ Status
VerticalSegmentWriter::_append_block_with_flexible_partial_content(RowsIn
// this column is not needed in read path for merge-on-write table
// 7. fill row store column
- _serialize_block_to_row_column(full_block);
+ for (auto cid = _tablet_schema->num_key_columns(); cid <
_tablet_schema->num_columns(); cid++) {
+ if (!_tablet_schema->column(cid).is_row_store_column()) {
+ continue;
+ }
+ RETURN_IF_ERROR(_create_column_writer(cast_set<uint32_t>(cid),
_tablet_schema->column(cid),
+ _tablet_schema));
+ RETURN_IF_ERROR(_append_row_store_column(full_block, data.row_pos,
data.num_rows,
+ cast_set<uint32_t>(cid)));
+ RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
+ }
std::vector<uint32_t> column_ids;
for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
@@ -833,6 +866,9 @@ Status
VerticalSegmentWriter::_append_block_with_flexible_partial_content(RowsIn
// 8. encode and write all non-primary key columns(including sequence
column if exists)
for (auto cid = _tablet_schema->num_key_columns(); cid <
_tablet_schema->num_columns(); cid++) {
+ if (_tablet_schema->column(cid).is_row_store_column()) {
+ continue;
+ }
if (cid != _tablet_schema->sequence_col_idx()) {
RETURN_IF_ERROR(_create_column_writer(cast_set<uint32_t>(cid),
_tablet_schema->column(cid),
_tablet_schema));
@@ -1013,11 +1049,21 @@ Status VerticalSegmentWriter::write_batch() {
}
// Row column should be filled here when it's a directly write from
memtable
// or it's schema change write(since column data type maybe changed, so we
should reubild)
- if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
- _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
- for (auto& data : _batched_blocks) {
- // TODO: maybe we should pass range to this method
- _serialize_block_to_row_column(*const_cast<Block*>(data.block));
+ bool should_write_row_store_column = _opts.write_type ==
DataWriteType::TYPE_DIRECT ||
+ _opts.write_type ==
DataWriteType::TYPE_SCHEMA_CHANGE;
+ if (should_write_row_store_column) {
+ for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+ if (!_tablet_schema->column(cid).is_row_store_column()) {
+ continue;
+ }
+ RETURN_IF_ERROR(
+ _create_column_writer(cid, _tablet_schema->column(cid),
_tablet_schema));
+ for (auto& data : _batched_blocks) {
+ RETURN_IF_ERROR(
+ _append_row_store_column(*data.block, data.row_pos,
data.num_rows, cid));
+ }
+ RETURN_IF_ERROR(_check_column_writer_disk_capacity(cid));
+ RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
}
}
@@ -1038,6 +1084,9 @@ Status VerticalSegmentWriter::write_batch() {
// the key is cluster key column unique id
std::map<uint32_t, IOlapColumnDataAccessor*> cid_to_column;
for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
+ if (should_write_row_store_column &&
_tablet_schema->column(cid).is_row_store_column()) {
+ continue;
+ }
RETURN_IF_ERROR(_create_column_writer(cid,
_tablet_schema->column(cid), _tablet_schema));
for (auto& data : _batched_blocks) {
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
@@ -1065,11 +1114,7 @@ Status VerticalSegmentWriter::write_batch() {
data.num_rows));
_olap_data_convertor->clear_source_content();
}
- if (_data_dir != nullptr &&
-
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
- return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed
capacity limit.",
-
_data_dir->path_hash());
- }
+ RETURN_IF_ERROR(_check_column_writer_disk_capacity(cid));
RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
}
diff --git a/be/src/storage/segment/vertical_segment_writer.h
b/be/src/storage/segment/vertical_segment_writer.h
index 39235811c07..8aa8a24a7c4 100644
--- a/be/src/storage/segment/vertical_segment_writer.h
+++ b/be/src/storage/segment/vertical_segment_writer.h
@@ -158,7 +158,8 @@ private:
void _set_min_max_key(const Slice& key);
void _set_min_key(const Slice& key);
void _set_max_key(const Slice& key);
- void _serialize_block_to_row_column(Block& block);
+ Status _append_row_store_column(const Block& block, size_t row_pos, size_t
num_rows,
+ uint32_t cid);
Status _probe_key_for_mow(std::string key, std::size_t segment_pos, bool
have_input_seq_column,
bool have_delete_sign,
const std::vector<RowsetSharedPtr>&
specified_rowsets,
@@ -194,6 +195,7 @@ private:
IOlapColumnDataAccessor* seq_column, size_t num_rows, bool
need_sort);
Status _generate_short_key_index(std::vector<IOlapColumnDataAccessor*>&
key_columns,
size_t num_rows, const
std::vector<size_t>& short_key_pos);
+ Status _check_column_writer_disk_capacity(size_t cid);
Status _finalize_column_writer_and_update_meta(size_t cid);
bool _is_mow();
diff --git a/be/src/util/jsonb/serialize.cpp b/be/src/util/jsonb/serialize.cpp
index 6ff4a076f89..1b449da515c 100644
--- a/be/src/util/jsonb/serialize.cpp
+++ b/be/src/util/jsonb/serialize.cpp
@@ -20,6 +20,7 @@
#include <assert.h>
#include <algorithm>
+#include <limits>
#include <memory>
#include <unordered_set>
#include <vector>
@@ -47,13 +48,31 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema&
schema, const Block&
ColumnString& dst, int num_cols,
const DataTypeSerDeSPtrs& serdes,
const std::unordered_set<int32_t>&
row_store_cids) {
- auto num_rows = block.rows();
+ block_to_jsonb(schema, block, dst, num_cols, serdes, row_store_cids, 0,
block.rows());
+}
+
+void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const
Block& block,
+ ColumnString& dst, int num_cols,
+ const DataTypeSerDeSPtrs& serdes,
+ const std::unordered_set<int32_t>&
row_store_cids,
+ size_t row_pos, size_t num_rows) {
+ static_cast<void>(block_to_jsonb(schema, block, dst, num_cols, serdes,
row_store_cids, row_pos,
+ num_rows,
std::numeric_limits<size_t>::max()));
+}
+
+size_t JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const
Block& block,
+ ColumnString& dst, int num_cols,
+ const DataTypeSerDeSPtrs& serdes,
+ const std::unordered_set<int32_t>&
row_store_cids,
+ size_t row_pos, size_t num_rows,
size_t max_bytes) {
Arena arena;
assert(num_cols <= block.columns());
+ assert(row_pos + num_rows <= block.rows());
DataTypeSerDe::FormatOptions options;
auto tz = cctz::utc_time_zone();
options.timezone = &tz;
- for (int i = 0; i < num_rows; ++i) {
+ size_t written_rows = 0;
+ for (size_t i = row_pos; i < row_pos + num_rows; ++i) {
JsonbWriterT<JsonbOutStream> jsonb_writer;
jsonb_writer.writeStartObject();
for (int j = 0; j < num_cols; ++j) {
@@ -71,7 +90,12 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema&
schema, const Block&
}
jsonb_writer.writeEndObject();
dst.insert_data(jsonb_writer.getOutput()->getBuffer(),
jsonb_writer.getOutput()->getSize());
+ ++written_rows;
+ if (dst.byte_size() >= max_bytes) {
+ break;
+ }
}
+ return written_rows;
}
// batch rows
diff --git a/be/src/util/jsonb/serialize.h b/be/src/util/jsonb/serialize.h
index e25ecc00af4..36627e0b5ea 100644
--- a/be/src/util/jsonb/serialize.h
+++ b/be/src/util/jsonb/serialize.h
@@ -41,6 +41,14 @@ public:
static void block_to_jsonb(const TabletSchema& schema, const Block& block,
ColumnString& dst,
int num_cols, const DataTypeSerDeSPtrs& serdes,
const std::unordered_set<int32_t>&
row_store_cids);
+ static void block_to_jsonb(const TabletSchema& schema, const Block& block,
ColumnString& dst,
+ int num_cols, const DataTypeSerDeSPtrs& serdes,
+ const std::unordered_set<int32_t>&
row_store_cids, size_t row_pos,
+ size_t num_rows);
+ static size_t block_to_jsonb(const TabletSchema& schema, const Block&
block, ColumnString& dst,
+ int num_cols, const DataTypeSerDeSPtrs&
serdes,
+ const std::unordered_set<int32_t>&
row_store_cids, size_t row_pos,
+ size_t num_rows, size_t max_bytes);
// batch rows
static Status jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const
ColumnString& jsonb_column,
const std::unordered_map<uint32_t, uint32_t>&
col_id_to_idx,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]