This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new bcfa30d9c54 branch-4.1: [bug](iceberg) fix iceberg sink writer with 
spill report error (#62899) (#63993)
bcfa30d9c54 is described below

commit bcfa30d9c54f6290dbb449c75c187c19d91dc87d
Author: zhangstar333 <[email protected]>
AuthorDate: Tue Jun 2 21:12:44 2026 +0800

    branch-4.1: [bug](iceberg) fix iceberg sink writer with spill report error 
(#62899) (#63993)
    
    cherry-pick from master (#62899)
---
 .../operator/spill_iceberg_table_sink_operator.cpp | 33 +++++++++++-----------
 .../sink/writer/iceberg/viceberg_sort_writer.cpp   | 29 +++++++++++++++++++
 .../sink/writer/iceberg/viceberg_sort_writer.h     | 25 ++++++++++++----
 .../sink/writer/iceberg/viceberg_table_writer.cpp  |  6 ++--
 .../sink/writer/iceberg/viceberg_table_writer.h    | 10 +++++--
 5 files changed, 75 insertions(+), 28 deletions(-)

diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp 
b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
index c557ac58e60..ab02a0a1f36 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
@@ -54,43 +54,42 @@ bool SpillIcebergTableSinkLocalState::is_blockable() const {
 }
 
 size_t SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
-    if (!_writer || !_writer->current_writer()) {
+    if (!_writer) {
         return 0;
     }
-
-    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
-    if (!sort_writer || !sort_writer->sorter()) {
+    auto current_writer = _writer->current_writer();
+    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+    if (!sort_writer) {
         return 0;
     }
 
-    return sort_writer->sorter()->get_reserve_mem_size(state, eos);
+    return sort_writer->get_reserve_mem_size(state, eos);
 }
 
 size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* 
state) const {
-    if (!_writer || !_writer->current_writer()) {
+    if (!_writer) {
         return 0;
     }
-
-    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
-    if (!sort_writer || !sort_writer->sorter()) {
+    auto current_writer = _writer->current_writer();
+    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+    if (!sort_writer) {
         return 0;
     }
 
-    return sort_writer->sorter()->data_size();
+    return sort_writer->data_size();
 }
 
 Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) {
-    if (!_writer || !_writer->current_writer()) {
+    if (!_writer) {
         return Status::OK();
     }
-
-    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(_writer->current_writer().get());
-
-    if (!sort_writer || !sort_writer->sorter()) {
+    auto current_writer = _writer->current_writer();
+    auto* sort_writer = 
dynamic_cast<VIcebergSortWriter*>(current_writer.get());
+    if (!sort_writer) {
         return Status::OK();
     }
 
-    auto exception_catch_func = [sort_writer]() {
+    auto exception_catch_func = [current_writer, sort_writer]() {
         auto status = [&]() {
             RETURN_IF_CATCH_EXCEPTION({ return sort_writer->trigger_spill(); 
});
         }();
@@ -175,4 +174,4 @@ void 
SpillIcebergTableSinkLocalState::_init_spill_counters() {
 }
 
 #include "common/compile_check_end.h"
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp 
b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
index 15f72b0f7cc..db453c511fa 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
@@ -56,6 +56,8 @@ Status VIcebergSortWriter::open(RuntimeState* state, 
RuntimeProfile* profile,
 }
 
 Status VIcebergSortWriter::write(Block& block) {
+    std::lock_guard<std::mutex> lock(_sorter_mutex);
+
     // Append incoming block data to the sorter's internal buffer
     RETURN_IF_ERROR(_sorter->append_block(&block));
     _update_spill_block_batch_row_count(block);
@@ -73,7 +75,34 @@ Status VIcebergSortWriter::write(Block& block) {
     return Status::OK();
 }
 
+size_t VIcebergSortWriter::data_size() const {
+    std::lock_guard<std::mutex> lock(_sorter_mutex);
+    return _sorter == nullptr ? 0 : _sorter->data_size();
+}
+
+size_t VIcebergSortWriter::get_reserve_mem_size(RuntimeState* state, bool eos) 
const {
+    std::lock_guard<std::mutex> lock(_sorter_mutex);
+    return _sorter == nullptr ? 0 : _sorter->get_reserve_mem_size(state, eos);
+}
+
+Status VIcebergSortWriter::trigger_spill() {
+    std::lock_guard<std::mutex> lock(_sorter_mutex);
+    if (_closed || _sorter == nullptr) {
+        return Status::OK();
+    }
+    return _do_spill();
+}
+
 Status VIcebergSortWriter::close(const Status& status) {
+    std::lock_guard<std::mutex> lock(_sorter_mutex);
+    if (_closed) {
+        return Status::OK();
+    }
+    Defer mark_closed {[&]() { _closed = true; }};
+    return _close_locked(status);
+}
+
+Status VIcebergSortWriter::_close_locked(const Status& status) {
     // Track the actual internal status of operations performed during close.
     // This is important because if intermediate operations (like do_sort()) 
fail,
     // we need to propagate the actual error status to the underlying 
partition writer's
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h 
b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
index 45858f473c9..e1e512f0a0c 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
+++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
@@ -18,7 +18,7 @@
 #pragma once
 
 #include <cstdint>
-#include <limits>
+#include <mutex>
 #include <utility>
 #include <vector>
 
@@ -101,12 +101,12 @@ public:
 
     inline size_t written_len() const override { return 
_iceberg_partition_writer->written_len(); }
 
-    // Returns a raw pointer to the FullSorter, used by 
SpillIcebergTableSinkLocalState
-    // to query memory usage (data_size, get_reserve_mem_size)
-    auto sorter() const { return _sorter.get(); }
+    size_t data_size() const;
+
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) const;
 
     // Called by the memory management system to trigger spilling data to disk
-    Status trigger_spill() { return _do_spill(); }
+    Status trigger_spill();
 
 private:
     // Calculate average row size from the first non-empty block to determine
@@ -129,6 +129,8 @@ private:
     // Explicitly calls do_sort() before prepare_for_read() to guarantee 
sorted output.
     Status _do_spill();
 
+    Status _close_locked(const Status& status);
+
     // Merge all spilled streams and output final sorted data to Parquet/ORC 
files.
     // Handles file splitting when output exceeds target file size.
     Status _combine_files_output();
@@ -168,6 +170,17 @@ private:
     std::unique_ptr<FullSorter> _sorter;
     std::unique_ptr<VSortedRunMerger> _merger;
 
+    // Serialize all accesses to _sorter because async writes and revoke 
spills run on
+    // different thread pools but touch the same FullSorter instance.
+    mutable std::mutex _sorter_mutex;
+
+    // Set to true once close() has finished tearing down the sorter / 
underlying writer.
+    // Late-arriving revoke spills (which run on a different thread than the 
async writer)
+    // must become no-ops after close, otherwise they would write to a fresh 
spill stream
+    // whose data never gets merged out (close has already produced the final 
output and
+    // cleaned up spill files).
+    bool _closed = false;
+
     // Queue of spill files waiting to be merged (FIFO order)
     std::deque<SpillFileSPtr> _sorted_spill_files;
     // Files currently being consumed by the merger
@@ -185,4 +198,4 @@ private:
     RuntimeProfile::Counter* _do_spill_count_counter = nullptr;
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
index 91ce0911200..c79aa3262bf 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -284,7 +284,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block& 
output_block) {
         SCOPED_RAW_TIMER(&_partition_writers_write_ns);
         output_block.erase(_non_write_columns_indices);
         RETURN_IF_ERROR(writer->write(output_block));
-        _current_writer = writer;
+        _current_writer.store(writer);
         return Status::OK();
     }
 
@@ -327,7 +327,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block& 
output_block) {
         SCOPED_RAW_TIMER(&_partition_writers_write_ns);
         output_block.erase(_non_write_columns_indices);
         RETURN_IF_ERROR(writer->write(output_block));
-        _current_writer = writer;
+        _current_writer.store(writer);
         return Status::OK();
     }
 
@@ -430,7 +430,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block& 
output_block) {
         Block filtered_block;
         RETURN_IF_ERROR(_filter_block(output_block, &it->second, 
&filtered_block));
         RETURN_IF_ERROR(it->first->write(filtered_block));
-        _current_writer = it->first;
+        _current_writer.store(it->first);
     }
     return Status::OK();
 }
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h 
b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
index f94ce4feb6b..cc7cec1fdad 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h
@@ -19,6 +19,7 @@
 
 #include <gen_cpp/DataSinks_types.h>
 
+#include "common/atomic_shared_ptr.h"
 #include "core/block/block.h"
 #include "core/column/column.h"
 #include "exec/sink/writer/async_result_writer.h"
@@ -66,12 +67,17 @@ public:
     // Getter for the current partition writer.
     // Used by SpillIcebergTableSinkLocalState to access the current writer for
     // memory management operations (get_reserve_mem_size, revocable_mem_size, 
etc.).
-    const std::shared_ptr<IPartitionWriterBase>& current_writer() const { 
return _current_writer; }
+    // Returns a snapshot by value: the async writer thread updates 
_current_writer
+    // concurrently with the spill/revoke path, so callers must hold their own 
copy
+    // while operating on it instead of dereferencing the underlying member 
directly.
+    std::shared_ptr<IPartitionWriterBase> current_writer() const { return 
_current_writer.load(); }
 
 private:
     // The currently active partition writer (may be VIcebergPartitionWriter 
or VIcebergSortWriter).
     // Updated during write() to track which writer received the most recent 
data.
-    std::shared_ptr<IPartitionWriterBase> _current_writer;
+    // Wrapped in atomic_shared_ptr because revoke_memory / 
get_revocable_mem_size run on
+    // a different thread than the async writer that assigns to it.
+    doris::atomic_shared_ptr<IPartitionWriterBase> _current_writer;
     class IcebergPartitionColumn {
     public:
         IcebergPartitionColumn(const iceberg::PartitionField& field,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to