This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 096afe663d7 branch-4.1: [improvement](iceberg) Improve
VIcebergSortWriter code quality (#60978) (#61468)
096afe663d7 is described below
commit 096afe663d781748138bbe207de8034da88a577a
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Mar 18 10:59:18 2026 -0700
branch-4.1: [improvement](iceberg) Improve VIcebergSortWriter code quality
(#60978) (#61468)
bp #60978
---
be/src/pipeline/exec/operator.h | 6 +-
.../exec/spill_iceberg_table_sink_operator.cpp | 14 +-
.../sink/writer/iceberg/viceberg_sort_writer.cpp | 374 +++++++++++++++++++++
.../vec/sink/writer/iceberg/viceberg_sort_writer.h | 342 +++++--------------
.../sink/writer/iceberg/viceberg_table_writer.h | 8 +-
be/src/vec/spill/spill_reader.h | 2 +-
.../exec/multi_cast_data_streamer_test.cpp | 7 +-
.../operator/spillable_operator_test_helper.cpp | 2 +-
8 files changed, 488 insertions(+), 267 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index a003ac1316d..080b8647a36 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -418,8 +418,8 @@ public:
_spill_read_file_time =
ADD_TIMER_WITH_LEVEL(Base::custom_profile(),
"SpillReadFileTime", 1);
- _spill_read_derialize_block_timer =
- ADD_TIMER_WITH_LEVEL(Base::custom_profile(),
"SpillReadDerializeBlockTime", 1);
+ _spill_read_deserialize_block_timer =
+ ADD_TIMER_WITH_LEVEL(Base::custom_profile(),
"SpillReadDeserializeBlockTime", 1);
_spill_read_block_count =
ADD_COUNTER_WITH_LEVEL(Base::custom_profile(),
"SpillReadBlockCount", TUnit::UNIT, 1);
@@ -494,7 +494,7 @@ public:
RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;
RuntimeProfile::Counter* _spill_read_file_time = nullptr;
- RuntimeProfile::Counter* _spill_read_derialize_block_timer = nullptr;
+ RuntimeProfile::Counter* _spill_read_deserialize_block_timer = nullptr;
RuntimeProfile::Counter* _spill_read_block_count = nullptr;
// Total bytes of read data in Block format(in memory format)
RuntimeProfile::Counter* _spill_read_block_data_size = nullptr;
diff --git a/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
index d8f577af648..46240c20ffe 100644
--- a/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
@@ -54,12 +54,12 @@ bool SpillIcebergTableSinkLocalState::is_blockable() const {
}
size_t SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState*
state, bool eos) {
- if (!_writer || !_writer->_current_writer) {
+ if (!_writer || !_writer->current_writer()) {
return 0;
}
auto* sort_writer =
-
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->current_writer().get());
if (!sort_writer || !sort_writer->sorter()) {
return 0;
}
@@ -68,12 +68,12 @@ size_t
SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState* state
}
size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState*
state) const {
- if (!_writer || !_writer->_current_writer) {
+ if (!_writer || !_writer->current_writer()) {
return 0;
}
auto* sort_writer =
-
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->current_writer().get());
if (!sort_writer || !sort_writer->sorter()) {
return 0;
}
@@ -83,7 +83,7 @@ size_t
SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* sta
Status SpillIcebergTableSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
- if (!_writer || !_writer->_current_writer) {
+ if (!_writer || !_writer->current_writer()) {
if (spill_context) {
spill_context->on_task_finished();
}
@@ -91,7 +91,7 @@ Status SpillIcebergTableSinkLocalState::revoke_memory(
}
auto* sort_writer =
-
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->current_writer().get());
if (!sort_writer || !sort_writer->sorter()) {
if (spill_context) {
@@ -182,7 +182,7 @@ void
SpillIcebergTableSinkLocalState::_init_spill_counters() {
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadTaskCount", TUnit::UNIT, 1);
ADD_TIMER_WITH_LEVEL(profile, "SpillReadTaskWaitInQueueTime", 1);
ADD_TIMER_WITH_LEVEL(profile, "SpillReadFileTime", 1);
- ADD_TIMER_WITH_LEVEL(profile, "SpillReadDerializeBlockTime", 1);
+ ADD_TIMER_WITH_LEVEL(profile, "SpillReadDeserializeBlockTime", 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockBytes", TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileBytes", TUnit::BYTES, 1);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp
new file mode 100644
index 00000000000..f3dfda3f766
--- /dev/null
+++ b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.cpp
@@ -0,0 +1,374 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/writer/iceberg/viceberg_sort_writer.h"
+
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "vec/spill/spill_stream.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+Status VIcebergSortWriter::open(RuntimeState* state, RuntimeProfile* profile,
+ const RowDescriptor* row_desc) {
+ // row_desc is required for initializing sort expressions
+ DCHECK(row_desc != nullptr);
+ _runtime_state = state;
+ _profile = profile;
+ _row_desc = row_desc;
+
+ // Initialize sort expressions from sort_info (contains ordering columns,
asc/desc, nulls first/last)
+ RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
+ RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, *row_desc));
+ RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+
+ // Create FullSorter for in-memory sorting with spill support enabled.
+ // Parameters: limit=-1 (no limit), offset=0 (no offset)
+ _sorter = vectorized::FullSorter::create_unique(_vsort_exec_exprs, -1, 0,
&_pool,
+ _sort_info.is_asc_order,
_sort_info.nulls_first,
+ *row_desc, state,
_profile);
+ _sorter->init_profile(_profile);
+ // Enable spill support so the sorter can be used with the spill framework
+ _sorter->set_enable_spill();
+ _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount",
TUnit::UNIT);
+
+ // Open the underlying partition writer that handles actual file I/O
+ RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, row_desc));
+ return Status::OK();
+}
+
+Status VIcebergSortWriter::write(vectorized::Block& block) {
+ // Append incoming block data to the sorter's internal buffer
+ RETURN_IF_ERROR(_sorter->append_block(&block));
+ _update_spill_block_batch_row_count(block);
+
+ // When accumulated data size reaches the target file size threshold,
+ // sort the data in memory and flush it directly to a Parquet/ORC file.
+ // This avoids holding too much data in memory before writing.
+ if (_sorter->data_size() >= _target_file_size_bytes) {
+ return _flush_to_file();
+ }
+
+ // If data size is below threshold, wait for more data.
+ // Note: trigger_spill() may be called externally by the memory management
+ // system if memory pressure is high.
+ return Status::OK();
+}
+
+Status VIcebergSortWriter::close(const Status& status) {
+ // Track the actual internal status of operations performed during close.
+ // This is important because if intermediate operations (like do_sort())
fail,
+ // we need to propagate the actual error status to the underlying
partition writer's
+ // close() call, rather than the original status parameter which could be
OK.
+ Status internal_status = Status::OK();
+ // Track the close status of the underlying partition writer.
+ // If _iceberg_partition_writer->close() fails (e.g., Parquet file flush
error),
+ // we must propagate this error to the caller to avoid silent data loss.
+ Status close_status = Status::OK();
+
+ // Defer ensures the underlying partition writer is always closed and
+ // spill streams are cleaned up, regardless of whether intermediate
operations succeed.
+ // Uses internal_status to propagate any errors that occurred during close
operations.
+ Defer defer {[&]() {
+ // If any intermediate operation failed, pass that error to the
partition writer;
+ // otherwise, pass the original status from the caller.
+ close_status =
+ _iceberg_partition_writer->close(internal_status.ok() ? status
: internal_status);
+ if (!close_status.ok()) {
+ LOG(WARNING) << fmt::format("_iceberg_partition_writer close
failed, reason: {}",
+ close_status.to_string());
+ }
+ _cleanup_spill_streams();
+ }};
+
+ // If the original status is already an error or the query is cancelled,
+ // skip all close operations and propagate the original error
+ if (!status.ok() || _runtime_state->is_cancelled()) {
+ return status;
+ }
+
+ // If sorter was never initialized (e.g., no data was written), nothing to
do
+ if (_sorter == nullptr) {
+ return Status::OK();
+ }
+
+ // Check if there is any remaining data in the sorter (either unsorted or
already sorted blocks)
+ if (!_sorter->merge_sort_state()->unsorted_block()->empty() ||
+ !_sorter->merge_sort_state()->get_sorted_block().empty()) {
+ if (_sorted_streams.empty()) {
+ // No spill has occurred, all data is in memory.
+ // Sort the remaining data, prepare for reading, and write to file.
+ internal_status = _sorter->do_sort();
+ if (!internal_status.ok()) {
+ return internal_status;
+ }
+ internal_status = _sorter->prepare_for_read(false);
+ if (!internal_status.ok()) {
+ return internal_status;
+ }
+ internal_status = _write_sorted_data();
+ return internal_status;
+ }
+
+ // Some data has already been spilled to disk.
+ // Spill the remaining in-memory data to a new spill stream.
+ internal_status = _do_spill();
+ if (!internal_status.ok()) {
+ return internal_status;
+ }
+ }
+
+ // Merge all spilled streams using multi-way merge sort and output final
sorted data to files
+ if (!_sorted_streams.empty()) {
+ internal_status = _combine_files_output();
+ if (!internal_status.ok()) {
+ return internal_status;
+ }
+ }
+
+ // Return close_status if internal operations succeeded but the underlying
+ // partition writer's close() failed (e.g., file flush error).
+ // This prevents silent data loss where the caller thinks the write
succeeded
+ // but the file was not properly closed.
+ return close_status;
+}
+
+void VIcebergSortWriter::_update_spill_block_batch_row_count(const
vectorized::Block& block) {
+ auto rows = block.rows();
+ // Calculate average row size from the first non-empty block to determine
+ // the optimal batch size for spill operations
+ if (rows > 0 && 0 == _avg_row_bytes) {
+ _avg_row_bytes = std::max(1UL, block.bytes() / rows);
+ int64_t spill_batch_bytes = _runtime_state->spill_sort_batch_bytes();
// default 8MB
+ // Calculate how many rows fit in one spill batch (ceiling division)
+ _spill_block_batch_row_count = (spill_batch_bytes + _avg_row_bytes -
1) / _avg_row_bytes;
+ }
+}
+
+Status VIcebergSortWriter::_flush_to_file() {
+ // Sort the accumulated data in memory
+ RETURN_IF_ERROR(_sorter->do_sort());
+ // Prepare the sorted data for sequential reading (builds merge tree if
needed)
+ RETURN_IF_ERROR(_sorter->prepare_for_read(false));
+ // Write the sorted data to the current Parquet/ORC file
+ RETURN_IF_ERROR(_write_sorted_data());
+ // Close the current file (it has reached the target size) and open a new
writer
+ RETURN_IF_ERROR(_close_current_writer_and_open_next());
+ // Reset the sorter state to accept new data for the next file
+ _sorter->reset();
+ return Status::OK();
+}
+
+Status VIcebergSortWriter::_write_sorted_data() {
+ // Read sorted blocks from the sorter one by one and write them
+ // to the underlying partition writer (Parquet/ORC file)
+ bool eos = false;
+ Block block;
+ while (!eos && !_runtime_state->is_cancelled()) {
+ RETURN_IF_ERROR(_sorter->get_next(_runtime_state, &block, &eos));
+ RETURN_IF_ERROR(_iceberg_partition_writer->write(block));
+ block.clear_column_data();
+ }
+ return Status::OK();
+}
+
+Status VIcebergSortWriter::_close_current_writer_and_open_next() {
+ // Save the current file name and index before closing, so the next file
+ // can use an incremented index (e.g., file_0, file_1, file_2, ...)
+ std::string current_file_name = _iceberg_partition_writer->file_name();
+ int current_file_index = _iceberg_partition_writer->file_name_index();
+ RETURN_IF_ERROR(_iceberg_partition_writer->close(Status::OK()));
+
+ // Use the lambda to create a new partition writer with the next file index
+ _iceberg_partition_writer = _create_writer_lambda(¤t_file_name,
current_file_index + 1);
+ if (!_iceberg_partition_writer) {
+ return Status::InternalError("Failed to create new partition writer");
+ }
+
+ RETURN_IF_ERROR(_iceberg_partition_writer->open(_runtime_state, _profile,
_row_desc));
+ return Status::OK();
+}
+
+int32_t VIcebergSortWriter::_get_spill_batch_size() const {
+ // Clamp the batch row count to int32_t max to prevent overflow
+ if (_spill_block_batch_row_count > std::numeric_limits<int32_t>::max()) {
+ return std::numeric_limits<int32_t>::max();
+ }
+ return static_cast<int32_t>(_spill_block_batch_row_count);
+}
+
+Status VIcebergSortWriter::_do_spill() {
+ COUNTER_UPDATE(_do_spill_count_counter, 1);
+
+ // Explicitly sort the data before preparing for spill read.
+ // Although FullSorter::prepare_for_read(is_spill=true) internally calls
do_sort()
+ // when there is unsorted data (see sorter.cpp), we call do_sort()
explicitly here
+ // for clarity and to guarantee that the data written to the spill stream
is sorted.
+ // This ensures correctness of the subsequent multi-way merge phase.
+ RETURN_IF_ERROR(_sorter->do_sort());
+
+ // prepare_for_read(is_spill=true) adjusts limit/offset for spill mode
+ // and builds the merge tree for reading sorted data
+ RETURN_IF_ERROR(_sorter->prepare_for_read(true));
+ int32_t batch_size = _get_spill_batch_size();
+
+ // Register a new spill stream to store the sorted data on disk
+ SpillStreamSPtr spilling_stream;
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+ _runtime_state, spilling_stream,
print_id(_runtime_state->query_id()), "iceberg-sort",
+ 1 /* node_id */, batch_size,
_runtime_state->spill_sort_batch_bytes(), _profile));
+ _sorted_streams.emplace_back(spilling_stream);
+
+ // Read sorted data from the sorter in batches and write to the spill
stream
+ bool eos = false;
+ Block block;
+ while (!eos && !_runtime_state->is_cancelled()) {
+ // Use _get_spill_batch_size() for safe narrowing conversion from
size_t to int32_t
+ // instead of C-style cast, which includes bounds checking
+ RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill(_runtime_state,
&block,
+
_get_spill_batch_size(), &eos));
+ RETURN_IF_ERROR(spilling_stream->spill_block(_runtime_state, block,
eos));
+ block.clear_column_data();
+ }
+ // Reset the sorter to free memory and accept new data
+ _sorter->reset();
+ return Status::OK();
+}
+
+Status VIcebergSortWriter::_combine_files_output() {
+ // If there are too many spill streams to merge at once (limited by
memory),
+ // perform intermediate merges to reduce the number of streams
+ while (_sorted_streams.size() >
static_cast<size_t>(_calc_max_merge_streams())) {
+ RETURN_IF_ERROR(_do_intermediate_merge());
+ }
+
+ // Create the final merger that combines all remaining spill streams
+ RETURN_IF_ERROR(_create_final_merger());
+
+ // Read merged sorted data and write to Parquet/ORC files,
+ // splitting into new files when the target file size is exceeded
+ bool eos = false;
+ Block output_block;
+ size_t current_file_bytes = _iceberg_partition_writer->written_len();
+ while (!eos && !_runtime_state->is_cancelled()) {
+ RETURN_IF_ERROR(_merger->get_next(&output_block, &eos));
+ if (output_block.rows() > 0) {
+ size_t block_bytes = output_block.bytes();
+ RETURN_IF_ERROR(_iceberg_partition_writer->write(output_block));
+ current_file_bytes += block_bytes;
+ // If the current file exceeds the target size, close it and open
a new one
+ if (current_file_bytes > _target_file_size_bytes) {
+ RETURN_IF_ERROR(_close_current_writer_and_open_next());
+ current_file_bytes = 0;
+ }
+ }
+ output_block.clear_column_data();
+ }
+ return Status::OK();
+}
+
+Status VIcebergSortWriter::_do_intermediate_merge() {
+ int max_stream_count = _calc_max_merge_streams();
+ // Merge a subset of streams (non-final merge) to reduce total stream count
+ RETURN_IF_ERROR(_create_merger(false, _spill_block_batch_row_count,
max_stream_count));
+
+ // Register a new spill stream for the merged output
+ int32_t batch_size = _get_spill_batch_size();
+ SpillStreamSPtr tmp_stream;
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+ _runtime_state, tmp_stream, print_id(_runtime_state->query_id()),
"iceberg-sort-merge",
+ 1 /* node_id */, batch_size,
_runtime_state->spill_sort_batch_bytes(), _profile));
+
+ _sorted_streams.emplace_back(tmp_stream);
+
+ // Merge the selected streams and write the result to the new spill stream
+ bool eos = false;
+ Block merge_sorted_block;
+ while (!eos && !_runtime_state->is_cancelled()) {
+ merge_sorted_block.clear_column_data();
+ RETURN_IF_ERROR(_merger->get_next(&merge_sorted_block, &eos));
+ RETURN_IF_ERROR(tmp_stream->spill_block(_runtime_state,
merge_sorted_block, eos));
+ }
+
+ // Clean up the streams that were consumed during this intermediate merge
+ for (auto& stream : _current_merging_streams) {
+
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+ }
+ _current_merging_streams.clear();
+ return Status::OK();
+}
+
+int VIcebergSortWriter::_calc_max_merge_streams() const {
+ // Calculate the maximum number of streams that can be merged
simultaneously
+ // based on the available memory limit and per-stream batch size
+ auto count = _runtime_state->spill_sort_mem_limit() /
_runtime_state->spill_sort_batch_bytes();
+ if (count > std::numeric_limits<int>::max()) {
+ return std::numeric_limits<int>::max();
+ }
+ // Ensure at least 2 streams can be merged (minimum for a merge operation)
+ return std::max(2, static_cast<int>(count));
+}
+
+Status VIcebergSortWriter::_create_merger(bool is_final_merge, size_t
batch_size, int num_streams) {
+ // Create a multi-way merge sorter that reads from multiple sorted spill
streams
+ std::vector<vectorized::BlockSupplier> child_block_suppliers;
+ _merger =
std::make_unique<vectorized::VSortedRunMerger>(_sorter->get_sort_description(),
+ batch_size, -1,
0, _profile);
+ _current_merging_streams.clear();
+
+ // For final merge: merge all remaining streams
+ // For intermediate merge: merge only num_streams streams
+ size_t streams_to_merge = is_final_merge ? _sorted_streams.size() :
num_streams;
+
+ for (size_t i = 0; i < streams_to_merge && !_sorted_streams.empty(); ++i) {
+ auto stream = _sorted_streams.front();
+ stream->set_read_counters(_profile);
+ _current_merging_streams.emplace_back(stream);
+ // Create a block supplier lambda that reads the next block from the
spill stream
+ child_block_suppliers.emplace_back([stream](vectorized::Block* block,
bool* eos) {
+ return stream->read_next_block_sync(block, eos);
+ });
+ _sorted_streams.pop_front();
+ }
+
+ RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
+ return Status::OK();
+}
+
+Status VIcebergSortWriter::_create_final_merger() {
+ // Final merger uses the runtime batch size and merges all remaining
streams
+ return _create_merger(true, _runtime_state->batch_size(), 1);
+}
+
+void VIcebergSortWriter::_cleanup_spill_streams() {
+ // Clean up all remaining spill streams to release disk resources
+ for (auto& stream : _sorted_streams) {
+
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+ }
+ _sorted_streams.clear();
+
+ // Also clean up any streams that are currently being merged
+ for (auto& stream : _current_merging_streams) {
+
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+ }
+ _current_merging_streams.clear();
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
index f9b91021f2b..3df0f5a7d28 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
@@ -22,28 +22,59 @@
#include <utility>
#include <vector>
-#include "common/config.h"
#include "common/object_pool.h"
-#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/common/sort/sorter.h"
#include "vec/core/block.h"
-#include "vec/exprs/vslot_ref.h"
#include "vec/sink/writer/iceberg/viceberg_partition_writer.h"
#include "vec/sink/writer/iceberg/vpartition_writer_base.h"
-#include "vec/spill/spill_stream.h"
-#include "vec/spill/spill_stream_manager.h"
+
+// Forward declarations to minimize header dependencies.
+// Previously, spill_stream.h and spill_stream_manager.h were included directly
+// in this header, causing heavy transitive includes for all files that include
+// viceberg_sort_writer.h. Moving implementations to .cpp allows us to
forward-declare
+// these types and only include their headers in the .cpp file.
+namespace doris::vectorized {
+class SpillStream;
+using SpillStreamSPtr = std::shared_ptr<SpillStream>;
+} // namespace doris::vectorized
namespace doris {
class RuntimeState;
class RuntimeProfile;
namespace vectorized {
+
+/**
+ * VIcebergSortWriter is a decorator around VIcebergPartitionWriter that adds
sort-order support.
+ *
+ * Architecture:
+ * IPartitionWriterBase (abstract base class)
+ * ├── VIcebergPartitionWriter (writes data directly to Parquet/ORC
files)
+ * └── VIcebergSortWriter (sorts data before delegating to
VIcebergPartitionWriter)
+ *
+ * Key behaviors:
+ * 1. In-memory sorting: Accumulates data in a FullSorter. When accumulated
data reaches
+ * _target_file_size_bytes, sorts and flushes to a file, then opens a new
writer.
+ * 2. Spill to disk: When triggered by the memory management system via
trigger_spill(),
+ * sorts and writes data to a SpillStream on disk.
+ * 3. Multi-way merge: When closing, merges all spilled streams using a
VSortedRunMerger
+ * to produce final sorted output files.
+ */
class VIcebergSortWriter : public IPartitionWriterBase {
public:
+ // Lambda type for creating new VIcebergPartitionWriter instances.
+ // Used when a file is completed and a new file needs to be opened.
using CreateWriterLambda =
std::function<std::shared_ptr<VIcebergPartitionWriter>(
const std::string* file_name, int file_name_index)>;
+ /**
+ * Constructor.
+ * @param partition_writer The underlying writer that handles actual file
I/O
+ * @param sort_info Sort specification (columns, asc/desc, nulls
first/last)
+ * @param target_file_size_bytes Target file size before splitting to a
new file
+ * @param create_writer_lambda Lambda for creating new writers when file
splitting occurs
+ */
VIcebergSortWriter(std::shared_ptr<VIcebergPartitionWriter>
partition_writer,
TSortInfo sort_info, int64_t target_file_size_bytes,
CreateWriterLambda create_writer_lambda = nullptr)
@@ -52,77 +83,17 @@ public:
_create_writer_lambda(std::move(create_writer_lambda)),
_target_file_size_bytes(target_file_size_bytes) {}
+ // Initialize sort expressions, create FullSorter, and open the underlying
writer
Status open(RuntimeState* state, RuntimeProfile* profile,
- const RowDescriptor* row_desc) override {
- DCHECK(row_desc != nullptr);
- _runtime_state = state;
- _profile = profile;
- _row_desc = row_desc;
-
- RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
- RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc,
*row_desc));
- RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
-
- _sorter = vectorized::FullSorter::create_unique(
- _vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order,
_sort_info.nulls_first,
- *row_desc, state, _profile);
- _sorter->init_profile(_profile);
- _sorter->set_enable_spill();
- _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount",
TUnit::UNIT);
- RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile,
row_desc));
- return Status::OK();
- }
+ const RowDescriptor* row_desc) override;
- Status write(vectorized::Block& block) override {
- RETURN_IF_ERROR(_sorter->append_block(&block));
- _update_spill_block_batch_row_count(block);
- // sort in memory and write directly to Parquet file
- if (_sorter->data_size() >= _target_file_size_bytes) {
- return _flush_to_file();
- }
- // trigger_spill() will be called by memory management system
- return Status::OK();
- }
+ // Append data block to the sorter; triggers flush when target file size
is reached
+ Status write(vectorized::Block& block) override;
- Status close(const Status& status) override {
- Defer defer {[&]() {
- Status st = _iceberg_partition_writer->close(status);
- if (!st.ok()) {
- LOG(WARNING) << fmt::format("_iceberg_partition_writer close
failed, reason: {}",
- st.to_string());
- }
- _cleanup_spill_streams();
- }};
-
- if (!status.ok() || _runtime_state->is_cancelled()) {
- return status;
- }
-
- if (_sorter == nullptr) {
- return Status::OK();
- }
-
- if (!_sorter->merge_sort_state()->unsorted_block()->empty() ||
- !_sorter->merge_sort_state()->get_sorted_block().empty()) {
- if (_sorted_streams.empty()) {
- // data remaining in memory
- RETURN_IF_ERROR(_sorter->do_sort());
- RETURN_IF_ERROR(_sorter->prepare_for_read(false));
- RETURN_IF_ERROR(_write_sorted_data());
- return Status::OK();
- }
-
- // spill remaining data
- RETURN_IF_ERROR(_do_spill());
- }
-
- // Merge all spilled streams and output final sorted data
- if (!_sorted_streams.empty()) {
- RETURN_IF_ERROR(_combine_files_output());
- }
-
- return Status::OK();
- }
+ // Sort remaining data, perform multi-way merge if spill occurred, and
close the writer.
+ // Error handling: Tracks internal errors from intermediate operations and
propagates
+ // the actual error status (not the original caller status) to the
underlying writer.
+ Status close(const Status& status) override;
inline const std::string& file_name() const override {
return _iceberg_partition_writer->file_name();
@@ -134,197 +105,57 @@ public:
inline size_t written_len() const override { return
_iceberg_partition_writer->written_len(); }
+ // Returns a raw pointer to the FullSorter, used by
SpillIcebergTableSinkLocalState
+ // to query memory usage (data_size, get_reserve_mem_size)
auto sorter() const { return _sorter.get(); }
+ // Called by the memory management system to trigger spilling data to disk
Status trigger_spill() { return _do_spill(); }
private:
- // how many rows need in spill block batch
- void _update_spill_block_batch_row_count(const vectorized::Block& block) {
- auto rows = block.rows();
- if (rows > 0 && 0 == _avg_row_bytes) {
- _avg_row_bytes = std::max(1UL, block.bytes() / rows);
- int64_t spill_batch_bytes =
_runtime_state->spill_sort_batch_bytes(); // default 8MB
- _spill_block_batch_row_count =
- (spill_batch_bytes + _avg_row_bytes - 1) / _avg_row_bytes;
- }
- }
+ // Calculate average row size from the first non-empty block to determine
+ // the optimal batch row count for spill operations
+ void _update_spill_block_batch_row_count(const vectorized::Block& block);
- // have enought data, flush in-memory sorted data to file
- Status _flush_to_file() {
- RETURN_IF_ERROR(_sorter->do_sort());
- RETURN_IF_ERROR(_sorter->prepare_for_read(false));
- RETURN_IF_ERROR(_write_sorted_data());
- RETURN_IF_ERROR(_close_current_writer_and_open_next());
- _sorter->reset();
- return Status::OK();
- }
+ // Sort in-memory data and flush to a Parquet/ORC file, then open a new
writer
+ Status _flush_to_file();
- // write data into file
- Status _write_sorted_data() {
- bool eos = false;
- Block block;
- while (!eos && !_runtime_state->is_cancelled()) {
- RETURN_IF_ERROR(_sorter->get_next(_runtime_state, &block, &eos));
- RETURN_IF_ERROR(_iceberg_partition_writer->write(block));
- block.clear_column_data();
- }
- return Status::OK();
- }
+ // Read sorted data from the sorter and write to the underlying partition
writer
+ Status _write_sorted_data();
- // close current writer and open a new one with incremented file index
- Status _close_current_writer_and_open_next() {
- std::string current_file_name = _iceberg_partition_writer->file_name();
- int current_file_index = _iceberg_partition_writer->file_name_index();
- RETURN_IF_ERROR(_iceberg_partition_writer->close(Status::OK()));
+ // Close the current partition writer and create a new one with an
incremented file index
+ Status _close_current_writer_and_open_next();
- _iceberg_partition_writer =
- _create_writer_lambda(¤t_file_name, current_file_index +
1);
- if (!_iceberg_partition_writer) {
- return Status::InternalError("Failed to create new partition
writer");
- }
+ // Get the batch size for spill operations, clamped to int32_t max
+ int32_t _get_spill_batch_size() const;
- RETURN_IF_ERROR(_iceberg_partition_writer->open(_runtime_state,
_profile, _row_desc));
- return Status::OK();
- }
+ // Sort the current in-memory data and write it to a new spill stream on
disk.
+ // Explicitly calls do_sort() before prepare_for_read() to guarantee
sorted output.
+ Status _do_spill();
- // batch size max is int32_t max
- int32_t _get_spill_batch_size() const {
- if (_spill_block_batch_row_count >
std::numeric_limits<int32_t>::max()) {
- return std::numeric_limits<int32_t>::max();
- }
- return static_cast<int32_t>(_spill_block_batch_row_count);
- }
+ // Merge all spilled streams and output final sorted data to Parquet/ORC
files.
+ // Handles file splitting when output exceeds target file size.
+ Status _combine_files_output();
- Status _do_spill() {
- COUNTER_UPDATE(_do_spill_count_counter, 1);
- RETURN_IF_ERROR(_sorter->prepare_for_read(true));
- int32_t batch_size = _get_spill_batch_size();
-
- SpillStreamSPtr spilling_stream;
-
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
- _runtime_state, spilling_stream,
print_id(_runtime_state->query_id()),
- "iceberg-sort", 1 /* node_id */, batch_size,
- _runtime_state->spill_sort_batch_bytes(), _profile));
- _sorted_streams.emplace_back(spilling_stream);
-
- // spill sorted data to stream
- bool eos = false;
- Block block;
- while (!eos && !_runtime_state->is_cancelled()) {
- RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill(
- _runtime_state, &block, (int)_spill_block_batch_row_count,
&eos));
- RETURN_IF_ERROR(spilling_stream->spill_block(_runtime_state,
block, eos));
- block.clear_column_data();
- }
- _sorter->reset();
- return Status::OK();
- }
+ // Perform an intermediate merge when there are too many spill streams to
merge at once.
+ // Merges a subset of streams into a single new stream.
+ Status _do_intermediate_merge();
- // merge spilled streams and output sorted data to Parquet files
- Status _combine_files_output() {
- // merge until all streams can be merged in one pass
- while (_sorted_streams.size() >
static_cast<size_t>(_calc_max_merge_streams())) {
- RETURN_IF_ERROR(_do_intermediate_merge());
- }
- RETURN_IF_ERROR(_create_final_merger());
-
- bool eos = false;
- Block output_block;
- size_t current_file_bytes = _iceberg_partition_writer->written_len();
- while (!eos && !_runtime_state->is_cancelled()) {
- RETURN_IF_ERROR(_merger->get_next(&output_block, &eos));
- if (output_block.rows() > 0) {
- size_t block_bytes = output_block.bytes();
-
RETURN_IF_ERROR(_iceberg_partition_writer->write(output_block));
- current_file_bytes += block_bytes;
- if (current_file_bytes > _target_file_size_bytes) {
- // close current writer and commit to file
- RETURN_IF_ERROR(_close_current_writer_and_open_next());
- current_file_bytes = 0;
- }
- }
- output_block.clear_column_data();
- }
- return Status::OK();
- }
+ // Calculate the maximum number of streams that can be merged
simultaneously
+ // based on memory limits
+ int _calc_max_merge_streams() const;
- Status _do_intermediate_merge() {
- int max_stream_count = _calc_max_merge_streams();
- RETURN_IF_ERROR(_create_merger(false, _spill_block_batch_row_count,
max_stream_count));
-
- // register new spill stream for merged output
- int32_t batch_size = _get_spill_batch_size();
- SpillStreamSPtr tmp_stream;
-
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
- _runtime_state, tmp_stream,
print_id(_runtime_state->query_id()),
- "iceberg-sort-merge", 1 /* node_id */, batch_size,
- _runtime_state->spill_sort_batch_bytes(), _profile));
-
- _sorted_streams.emplace_back(tmp_stream);
-
- // merge current streams and write to new spill stream
- bool eos = false;
- Block merge_sorted_block;
- while (!eos && !_runtime_state->is_cancelled()) {
- merge_sorted_block.clear_column_data();
- RETURN_IF_ERROR(_merger->get_next(&merge_sorted_block, &eos));
- RETURN_IF_ERROR(tmp_stream->spill_block(_runtime_state,
merge_sorted_block, eos));
- }
-
- // clean up merged streams
- for (auto& stream : _current_merging_streams) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
- _current_merging_streams.clear();
- return Status::OK();
- }
+ // Create a VSortedRunMerger for merging spill streams
+ // @param is_final_merge If true, merges all remaining streams
+ // @param batch_size Number of rows per batch during merge
+ // @param num_streams Maximum number of streams to merge (used for
intermediate merges)
+ Status _create_merger(bool is_final_merge, size_t batch_size, int
num_streams);
- int _calc_max_merge_streams() const {
- auto count =
- _runtime_state->spill_sort_mem_limit() /
_runtime_state->spill_sort_batch_bytes();
- if (count > std::numeric_limits<int>::max()) {
- return std::numeric_limits<int>::max();
- }
- return std::max(2, static_cast<int>(count));
- }
+ // Create the final merger that merges all remaining spill streams
+ Status _create_final_merger();
- // create merger for merging spill streams
- Status _create_merger(bool is_final_merge, size_t batch_size, int
num_streams) {
- std::vector<vectorized::BlockSupplier> child_block_suppliers;
- _merger =
std::make_unique<vectorized::VSortedRunMerger>(_sorter->get_sort_description(),
- batch_size,
-1, 0, _profile);
- _current_merging_streams.clear();
- size_t streams_to_merge = is_final_merge ? _sorted_streams.size() :
num_streams;
-
- for (size_t i = 0; i < streams_to_merge && !_sorted_streams.empty();
++i) {
- auto stream = _sorted_streams.front();
- stream->set_read_counters(_profile);
- _current_merging_streams.emplace_back(stream);
- child_block_suppliers.emplace_back([stream](vectorized::Block*
block, bool* eos) {
- return stream->read_next_block_sync(block, eos);
- });
- _sorted_streams.pop_front();
- }
-
- RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
- return Status::OK();
- }
-
- Status _create_final_merger() { return _create_merger(true,
_runtime_state->batch_size(), 1); }
-
- // clean up all spill streams to ensure proper resource cleanup
- void _cleanup_spill_streams() {
- for (auto& stream : _sorted_streams) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
- _sorted_streams.clear();
-
- for (auto& stream : _current_merging_streams) {
-
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
- _current_merging_streams.clear();
- }
+ // Release all spill stream resources (both pending and currently merging)
+ void _cleanup_spill_streams();
RuntimeState* _runtime_state = nullptr;
RuntimeProfile* _profile = nullptr;
@@ -332,19 +163,28 @@ private:
ObjectPool _pool;
TSortInfo _sort_info;
VSortExecExprs _vsort_exec_exprs;
+ // The underlying partition writer that handles actual Parquet/ORC file I/O
std::shared_ptr<VIcebergPartitionWriter> _iceberg_partition_writer;
- CreateWriterLambda _create_writer_lambda; // creating new writers after
commit
+ // Lambda for creating new writers when file splitting occurs
+ CreateWriterLambda _create_writer_lambda;
- // Sorter and merger
+ // Sorter and merger for handling in-memory sorting and multi-way merge
std::unique_ptr<vectorized::FullSorter> _sorter;
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
+ // Queue of spill streams waiting to be merged (FIFO order)
std::deque<vectorized::SpillStreamSPtr> _sorted_streams;
+ // Streams currently being consumed by the merger
std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
- int64_t _target_file_size_bytes = 0; //config::iceberg_sink_max_file_size
default 1GB
+ // Target file size in bytes; files are split when this threshold is
exceeded
+ // Default: config::iceberg_sink_max_file_size (1GB)
+ int64_t _target_file_size_bytes = 0;
+ // Average row size in bytes, computed from the first non-empty block
size_t _avg_row_bytes = 0;
+ // Number of rows per spill batch, computed from average row size and
spill_sort_batch_bytes
size_t _spill_block_batch_row_count = 4096;
+ // Counter tracking how many times spill has been triggered
RuntimeProfile::Counter* _do_spill_count_counter = nullptr;
};
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index 842f962713f..0ef79bf9d75 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -60,9 +60,15 @@ public:
Status close(Status) override;
- std::shared_ptr<IPartitionWriterBase> _current_writer;
+ // Getter for the current partition writer.
+ // Used by SpillIcebergTableSinkLocalState to access the current writer for
+ // memory management operations (get_reserve_mem_size, revocable_mem_size,
etc.).
+ const std::shared_ptr<IPartitionWriterBase>& current_writer() const {
return _current_writer; }
private:
+ // The currently active partition writer (may be VIcebergPartitionWriter
or VIcebergSortWriter).
+ // Updated during write() to track which writer received the most recent
data.
+ std::shared_ptr<IPartitionWriterBase> _current_writer;
class IcebergPartitionColumn {
public:
IcebergPartitionColumn(const iceberg::PartitionField& field,
diff --git a/be/src/vec/spill/spill_reader.h b/be/src/vec/spill/spill_reader.h
index 8f379cf78af..1999b8a9d88 100644
--- a/be/src/vec/spill/spill_reader.h
+++ b/be/src/vec/spill/spill_reader.h
@@ -61,7 +61,7 @@ public:
RuntimeProfile* custom_profile =
operator_profile->get_child("CustomCounters");
DCHECK(custom_profile != nullptr);
_read_file_timer = custom_profile->get_counter("SpillReadFileTime");
- _deserialize_timer =
custom_profile->get_counter("SpillReadDerializeBlockTime");
+ _deserialize_timer =
custom_profile->get_counter("SpillReadDeserializeBlockTime");
_read_block_count = custom_profile->get_counter("SpillReadBlockCount");
_read_block_data_size =
custom_profile->get_counter("SpillReadBlockBytes");
_read_file_size = custom_profile->get_counter("SpillReadFileBytes");
diff --git a/be/test/pipeline/exec/multi_cast_data_streamer_test.cpp
b/be/test/pipeline/exec/multi_cast_data_streamer_test.cpp
index e3bd5353d5b..72170bc7fe0 100644
--- a/be/test/pipeline/exec/multi_cast_data_streamer_test.cpp
+++ b/be/test/pipeline/exec/multi_cast_data_streamer_test.cpp
@@ -56,8 +56,8 @@ public:
ADD_COUNTER_WITH_LEVEL(custom_profile.get(),
"SpillWriteFileBytes", TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteRows",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileTime",
TUnit::UNIT, 1);
- ADD_COUNTER_WITH_LEVEL(custom_profile.get(),
"SpillReadDerializeBlockTime", TUnit::UNIT,
- 1);
+ ADD_COUNTER_WITH_LEVEL(custom_profile.get(),
"SpillReadDeserializeBlockTime",
+ TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(),
"SpillReadBlockCount", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(),
"SpillReadBlockBytes", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileBytes",
TUnit::UNIT, 1);
@@ -100,7 +100,8 @@ public:
ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(),
"SpillReadTaskWaitInQueueTime",
1);
ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(),
"SpillReadFileTime", 1);
- ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(),
"SpillReadDerializeBlockTime", 1);
+ ADD_TIMER_WITH_LEVEL(source_custom_profiles[i].get(),
"SpillReadDeserializeBlockTime",
+ 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(),
"SpillReadBlockCount",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(),
"SpillReadBlockBytes",
diff --git a/be/test/pipeline/operator/spillable_operator_test_helper.cpp
b/be/test/pipeline/operator/spillable_operator_test_helper.cpp
index 3e580365196..1399e73ec01 100644
--- a/be/test/pipeline/operator/spillable_operator_test_helper.cpp
+++ b/be/test/pipeline/operator/spillable_operator_test_helper.cpp
@@ -53,7 +53,7 @@ void SpillableOperatorTestHelper::SetUp() {
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileBytes",
TUnit::BYTES, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteRows",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileTime",
TUnit::UNIT, 1);
- ADD_COUNTER_WITH_LEVEL(custom_profile.get(),
"SpillReadDerializeBlockTime", TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(custom_profile.get(),
"SpillReadDeserializeBlockTime", TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadBlockCount",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadBlockBytes",
TUnit::UNIT, 1);
ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileBytes",
TUnit::UNIT, 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]