This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 46457bcecd4 Revert "[Improvement](sink) optimization for parallel 
result sink (#3… (#36628)
46457bcecd4 is described below

commit 46457bcecd4dcd1080310546ddf618ff1b4fb0f5
Author: Pxl <pxl...@qq.com>
AuthorDate: Fri Jun 21 09:51:42 2024 +0800

    Revert "[Improvement](sink) optimization for parallel result sink (#3… 
(#36628)
    
    …6305)"
    
    This reverts commit fdb5891c3eccefad7a354436dfb0eae82da5bd6e.
---
 be/src/pipeline/exec/result_file_sink_operator.cpp |   5 +-
 be/src/pipeline/exec/result_file_sink_operator.h   |   2 +-
 be/src/pipeline/exec/result_sink_operator.cpp      |  13 +-
 be/src/pipeline/exec/result_sink_operator.h        |   2 +-
 be/src/pipeline/local_exchange/local_exchanger.cpp |  22 +-
 be/src/runtime/buffer_control_block.cpp            | 258 +++++++++++----------
 be/src/runtime/buffer_control_block.h              |  33 ++-
 be/src/runtime/result_buffer_mgr.cpp               |   6 +-
 be/src/runtime/result_buffer_mgr.h                 |   3 +-
 be/src/runtime/result_writer.h                     |   2 +-
 be/src/service/point_query_executor.cpp            |  14 +-
 be/src/service/point_query_executor.h              |   2 +-
 be/src/vec/sink/varrow_flight_result_writer.cpp    |   4 +-
 be/src/vec/sink/varrow_flight_result_writer.h      |   2 +-
 be/src/vec/sink/vmysql_result_writer.cpp           |   4 +-
 be/src/vec/sink/vmysql_result_writer.h             |   2 +-
 be/src/vec/sink/writer/async_result_writer.cpp     |   2 +-
 .../sink/writer/iceberg/viceberg_table_writer.cpp  |   2 +-
 .../sink/writer/iceberg/viceberg_table_writer.h    |   2 +-
 be/src/vec/sink/writer/vfile_result_writer.cpp     |   5 +-
 be/src/vec/sink/writer/vfile_result_writer.h       |   2 +-
 be/src/vec/sink/writer/vhive_table_writer.cpp      |   2 +-
 be/src/vec/sink/writer/vhive_table_writer.h        |   4 +-
 be/src/vec/sink/writer/vjdbc_table_writer.cpp      |   2 +-
 be/src/vec/sink/writer/vjdbc_table_writer.h        |   2 +-
 be/src/vec/sink/writer/vmysql_table_writer.cpp     |   2 +-
 be/src/vec/sink/writer/vmysql_table_writer.h       |   2 +-
 be/src/vec/sink/writer/vodbc_table_writer.cpp      |   2 +-
 be/src/vec/sink/writer/vodbc_table_writer.h        |   2 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          |   4 +-
 be/src/vec/sink/writer/vtablet_writer.h            |   2 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |   4 +-
 be/src/vec/sink/writer/vtablet_writer_v2.h         |   2 +-
 .../serde/data_type_serde_mysql_test.cpp           |   2 +-
 34 files changed, 213 insertions(+), 206 deletions(-)

diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 029bea7494e..0cd14899f52 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -99,8 +99,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
     if (p._is_top_sink) {
         // create sender
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                state->fragment_instance_id(), p._buf_size, &_sender, 
state->execution_timeout(),
-                state->batch_size()));
+                state->fragment_instance_id(), p._buf_size, &_sender, 
state->execution_timeout()));
         // create writer
         _writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
                 p._file_opts.get(), p._storage_type, 
state->fragment_instance_id(),
@@ -176,7 +175,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
         // close sender, this is normal path end
         if (_sender) {
             _sender->update_return_rows(_writer == nullptr ? 0 : 
_writer->get_written_rows());
-            RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status));
+            RETURN_IF_ERROR(_sender->close(final_status));
         }
         state->exec_env()->result_mgr()->cancel_at_time(
                 time(nullptr) + config::result_buffer_cancelled_interval_time,
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h 
b/be/src/pipeline/exec/result_file_sink_operator.h
index 7623dae7fea..4fa31f615ce 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -107,7 +107,7 @@ private:
 
     // Owned by the RuntimeState.
     RowDescriptor _output_row_descriptor;
-    int _buf_size = 4096; // Allocated from _pool
+    int _buf_size = 1024; // Allocated from _pool
     bool _is_top_sink = true;
     std::string _header;
     std::string _header_type;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 378fea18eea..24c5162c4f4 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -49,10 +49,10 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
         _sender = _parent->cast<ResultSinkOperatorX>()._sender;
     } else {
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                fragment_instance_id, RESULT_SINK_BUFFER_SIZE, &_sender, 
state->execution_timeout(),
-                state->batch_size()));
+                state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE, 
&_sender,
+                state->execution_timeout()));
     }
-    _sender->set_dependency(fragment_instance_id, 
_dependency->shared_from_this());
+    _sender->set_dependency(_dependency->shared_from_this());
     return Status::OK();
 }
 
@@ -122,8 +122,7 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {
 
     if (state->query_options().enable_parallel_result_sink) {
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, 
state->execution_timeout(),
-                state->batch_size()));
+                state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, 
state->execution_timeout()));
     }
     return Status::OK();
 }
@@ -140,7 +139,7 @@ Status ResultSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block,
     if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
         RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
     }
-    RETURN_IF_ERROR(local_state._writer->write(state, *block));
+    RETURN_IF_ERROR(local_state._writer->write(*block));
     if (_fetch_option.use_two_phase_fetch) {
         // Block structure may be changed by calling 
_second_phase_fetch_data().
         // So we should clear block in case of unmatched columns
@@ -186,7 +185,7 @@ Status ResultSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
         if (_writer) {
             _sender->update_return_rows(_writer->get_written_rows());
         }
-        RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status));
+        RETURN_IF_ERROR(_sender->close(final_status));
     }
     state->exec_env()->result_mgr()->cancel_at_time(
             time(nullptr) + config::result_buffer_cancelled_interval_time,
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index 1d2490f486d..0ccb7f4946b 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -104,7 +104,7 @@ struct ResultFileOptions {
     }
 };
 
-constexpr int RESULT_SINK_BUFFER_SIZE = 4096 * 8;
+constexpr int RESULT_SINK_BUFFER_SIZE = 4096;
 
 class ResultSinkLocalState final : public 
PipelineXSinkLocalState<BasicSharedState> {
     ENABLE_FACTORY_CREATOR(ResultSinkLocalState);
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index a8dc13438c1..51d2c8268e7 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -200,9 +200,8 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     }
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
-    size_t allocated_bytes = new_block.allocated_bytes();
+    local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
     if (_data_queue[channel_id].enqueue(std::move(new_block))) {
-        local_state._shared_state->add_mem_usage(channel_id, allocated_bytes);
         local_state._shared_state->set_ready_to_read(channel_id);
     }
 
@@ -221,16 +220,25 @@ void 
PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                        LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    if (_running_sink_operators == 0) {
+        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+            block->swap(next_block);
+            local_state._shared_state->sub_mem_usage(local_state._channel_id,
+                                                     block->allocated_bytes());
+            if (_free_block_limit == 0 ||
+                _free_blocks.size_approx() < _free_block_limit * _num_sources) 
{
+                _free_blocks.enqueue(std::move(next_block));
+            }
+        } else {
+            *eos = true;
+        }
+    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         block->swap(next_block);
-        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
         if (_free_block_limit == 0 ||
             _free_blocks.size_approx() < _free_block_limit * _num_sources) {
             _free_blocks.enqueue(std::move(next_block));
         }
-    } else if (all_finished) {
-        *eos = true;
+        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index a1a83b22840..8ef23265e3f 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -31,7 +31,7 @@
 
 #include "arrow/record_batch.h"
 #include "arrow/type_fwd.h"
-#include "pipeline/dependency.h"
+#include "pipeline/exec/result_sink_operator.h"
 #include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
 #include "util/thrift_util.h"
@@ -85,14 +85,13 @@ void GetResultBatchCtx::on_data(const 
std::unique_ptr<TFetchDataResult>& t_resul
     delete this;
 }
 
-BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, 
int batch_size)
+BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size)
         : _fragment_id(id),
           _is_close(false),
           _is_cancelled(false),
           _buffer_rows(0),
           _buffer_limit(buffer_size),
-          _packet_num(0),
-          _batch_size(batch_size) {
+          _packet_num(0) {
     _query_statistics = std::make_unique<QueryStatistics>();
 }
 
@@ -104,153 +103,165 @@ Status BufferControlBlock::init() {
     return Status::OK();
 }
 
-Status BufferControlBlock::add_batch(RuntimeState* state,
-                                     std::unique_ptr<TFetchDataResult>& 
result) {
-    std::unique_lock<std::mutex> l(_lock);
+Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& 
result) {
+    {
+        std::unique_lock<std::mutex> l(_lock);
 
-    if (_is_cancelled) {
-        return Status::Cancelled("Cancelled");
-    }
+        if (_is_cancelled) {
+            return Status::Cancelled("Cancelled");
+        }
+
+        int num_rows = result->result_batch.rows.size();
+
+        while ((!_fe_result_batch_queue.empty() && _buffer_rows > 
_buffer_limit) &&
+               !_is_cancelled) {
+            _data_removal.wait_for(l, std::chrono::seconds(1));
+        }
 
-    int num_rows = result->result_batch.rows.size();
-    if (_waiting_rpc.empty()) {
-        // Merge result into batch to reduce rpc times
-        if (!_fe_result_batch_queue.empty() &&
-            ((_fe_result_batch_queue.back()->result_batch.rows.size() + 
num_rows) <
-             _buffer_limit) &&
-            !result->eos) {
-            std::vector<std::string>& back_rows = 
_fe_result_batch_queue.back()->result_batch.rows;
-            std::vector<std::string>& result_rows = result->result_batch.rows;
-            back_rows.insert(back_rows.end(), 
std::make_move_iterator(result_rows.begin()),
-                             std::make_move_iterator(result_rows.end()));
+        if (_is_cancelled) {
+            return Status::Cancelled("Cancelled");
+        }
+
+        if (_waiting_rpc.empty()) {
+            // Merge result into batch to reduce rpc times
+            if (!_fe_result_batch_queue.empty() &&
+                ((_fe_result_batch_queue.back()->result_batch.rows.size() + 
num_rows) <
+                 _buffer_limit) &&
+                !result->eos) {
+                std::vector<std::string>& back_rows =
+                        _fe_result_batch_queue.back()->result_batch.rows;
+                std::vector<std::string>& result_rows = 
result->result_batch.rows;
+                back_rows.insert(back_rows.end(), 
std::make_move_iterator(result_rows.begin()),
+                                 std::make_move_iterator(result_rows.end()));
+            } else {
+                _fe_result_batch_queue.push_back(std::move(result));
+            }
+            _buffer_rows += num_rows;
         } else {
-            _instance_rows_in_queue.emplace_back();
-            _fe_result_batch_queue.push_back(std::move(result));
+            auto* ctx = _waiting_rpc.front();
+            _waiting_rpc.pop_front();
+            ctx->on_data(result, _packet_num);
+            _packet_num++;
         }
-        _buffer_rows += num_rows;
-        _instance_rows[state->fragment_instance_id()] += num_rows;
-        _instance_rows_in_queue.back()[state->fragment_instance_id()] += 
num_rows;
-    } else {
-        auto* ctx = _waiting_rpc.front();
-        _waiting_rpc.pop_front();
-        ctx->on_data(result, _packet_num);
-        _packet_num++;
     }
-
     _update_dependency();
     return Status::OK();
 }
 
-Status BufferControlBlock::add_arrow_batch(RuntimeState* state,
-                                           
std::shared_ptr<arrow::RecordBatch>& result) {
-    std::unique_lock<std::mutex> l(_lock);
+Status 
BufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& 
result) {
+    {
+        std::unique_lock<std::mutex> l(_lock);
 
-    if (_is_cancelled) {
-        return Status::Cancelled("Cancelled");
-    }
+        if (_is_cancelled) {
+            return Status::Cancelled("Cancelled");
+        }
 
-    int num_rows = result->num_rows();
+        int num_rows = result->num_rows();
 
-    if (_is_cancelled) {
-        return Status::Cancelled("Cancelled");
-    }
+        while ((!_arrow_flight_batch_queue.empty() && _buffer_rows > 
_buffer_limit) &&
+               !_is_cancelled) {
+            _data_removal.wait_for(l, std::chrono::seconds(1));
+        }
 
-    // TODO: merge RocordBatch, ToStructArray -> Make again
+        if (_is_cancelled) {
+            return Status::Cancelled("Cancelled");
+        }
 
-    _arrow_flight_batch_queue.push_back(std::move(result));
-    _buffer_rows += num_rows;
-    _instance_rows_in_queue.emplace_back();
-    _instance_rows[state->fragment_instance_id()] += num_rows;
-    _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
+        // TODO: merge RocordBatch, ToStructArray -> Make again
+
+        _arrow_flight_batch_queue.push_back(std::move(result));
+        _buffer_rows += num_rows;
+        _data_arrival.notify_one();
+    }
     _update_dependency();
     return Status::OK();
 }
 
 void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
-    std::lock_guard<std::mutex> l(_lock);
-    if (!_status.ok()) {
-        ctx->on_failure(_status);
-        _update_dependency();
-        return;
-    }
-    if (_is_cancelled) {
-        ctx->on_failure(Status::Cancelled("Cancelled"));
-        _update_dependency();
-        return;
-    }
-    if (!_fe_result_batch_queue.empty()) {
-        // get result
-        std::unique_ptr<TFetchDataResult> result = 
std::move(_fe_result_batch_queue.front());
-        _fe_result_batch_queue.pop_front();
-        _buffer_rows -= result->result_batch.rows.size();
-        for (auto it : _instance_rows_in_queue.front()) {
-            _instance_rows[it.first] -= it.second;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        if (!_status.ok()) {
+            ctx->on_failure(_status);
+            _update_dependency();
+            return;
         }
-        _instance_rows_in_queue.pop_front();
-
-        ctx->on_data(result, _packet_num);
-        _packet_num++;
-        _update_dependency();
-        return;
-    }
-    if (_is_close) {
-        ctx->on_close(_packet_num, _query_statistics.get());
-        _update_dependency();
-        return;
+        if (_is_cancelled) {
+            ctx->on_failure(Status::Cancelled("Cancelled"));
+            _update_dependency();
+            return;
+        }
+        if (!_fe_result_batch_queue.empty()) {
+            // get result
+            std::unique_ptr<TFetchDataResult> result = 
std::move(_fe_result_batch_queue.front());
+            _fe_result_batch_queue.pop_front();
+            _buffer_rows -= result->result_batch.rows.size();
+            _data_removal.notify_one();
+
+            ctx->on_data(result, _packet_num);
+            _packet_num++;
+            _update_dependency();
+            return;
+        }
+        if (_is_close) {
+            ctx->on_close(_packet_num, _query_statistics.get());
+            _update_dependency();
+            return;
+        }
+        // no ready data, push ctx to waiting list
+        _waiting_rpc.push_back(ctx);
     }
-    // no ready data, push ctx to waiting list
-    _waiting_rpc.push_back(ctx);
     _update_dependency();
 }
 
 Status 
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* 
result) {
-    std::unique_lock<std::mutex> l(_lock);
-    if (!_status.ok()) {
-        return _status;
-    }
-    if (_is_cancelled) {
-        return Status::Cancelled("Cancelled");
-    }
+    {
+        std::unique_lock<std::mutex> l(_lock);
+        if (!_status.ok()) {
+            return _status;
+        }
+        if (_is_cancelled) {
+            return Status::Cancelled("Cancelled");
+        }
 
-    if (_is_cancelled) {
-        return Status::Cancelled("Cancelled");
-    }
+        while (_arrow_flight_batch_queue.empty() && !_is_cancelled && 
!_is_close) {
+            _data_arrival.wait_for(l, std::chrono::seconds(1));
+        }
 
-    if (!_arrow_flight_batch_queue.empty()) {
-        *result = std::move(_arrow_flight_batch_queue.front());
-        _arrow_flight_batch_queue.pop_front();
-        _buffer_rows -= (*result)->num_rows();
-        for (auto it : _instance_rows_in_queue.front()) {
-            _instance_rows[it.first] -= it.second;
+        if (_is_cancelled) {
+            return Status::Cancelled("Cancelled");
         }
-        _instance_rows_in_queue.pop_front();
-        _packet_num++;
-        _update_dependency();
-        return Status::OK();
-    }
 
-    // normal path end
-    if (_is_close) {
-        _update_dependency();
-        return Status::OK();
+        if (!_arrow_flight_batch_queue.empty()) {
+            *result = std::move(_arrow_flight_batch_queue.front());
+            _arrow_flight_batch_queue.pop_front();
+            _buffer_rows -= (*result)->num_rows();
+            _data_removal.notify_one();
+            _packet_num++;
+            _update_dependency();
+            return Status::OK();
+        }
+
+        // normal path end
+        if (_is_close) {
+            _update_dependency();
+            return Status::OK();
+        }
     }
     return Status::InternalError("Abnormal Ending");
 }
 
-Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
+Status BufferControlBlock::close(Status exec_status) {
     std::unique_lock<std::mutex> l(_lock);
-    auto it = _result_sink_dependencys.find(id);
-    if (it != _result_sink_dependencys.end()) {
-        it->second->set_always_ready();
-        _result_sink_dependencys.erase(it);
-    }
-    if (!_result_sink_dependencys.empty()) {
+    close_cnt++;
+    if (close_cnt < _result_sink_dependencys.size()) {
         return Status::OK();
     }
 
     _is_close = true;
     _status = exec_status;
 
+    // notify blocked get thread
+    _data_arrival.notify_all();
     if (!_waiting_rpc.empty()) {
         if (_status.ok()) {
             for (auto& ctx : _waiting_rpc) {
@@ -269,6 +280,8 @@ Status BufferControlBlock::close(const TUniqueId& id, 
Status exec_status) {
 void BufferControlBlock::cancel() {
     std::unique_lock<std::mutex> l(_lock);
     _is_cancelled = true;
+    _data_removal.notify_all();
+    _data_arrival.notify_all();
     for (auto& ctx : _waiting_rpc) {
         ctx->on_failure(Status::Cancelled("Cancelled"));
     }
@@ -277,25 +290,18 @@ void BufferControlBlock::cancel() {
 }
 
 void BufferControlBlock::set_dependency(
-        const TUniqueId& id, std::shared_ptr<pipeline::Dependency> 
result_sink_dependency) {
-    std::unique_lock<std::mutex> l(_lock);
-    _result_sink_dependencys[id] = result_sink_dependency;
-    _update_dependency();
+        std::shared_ptr<pipeline::Dependency> result_sink_dependency) {
+    _result_sink_dependencys.push_back(result_sink_dependency);
 }
 
 void BufferControlBlock::_update_dependency() {
-    if (_is_cancelled) {
-        for (auto it : _result_sink_dependencys) {
-            it.second->set_ready();
+    if (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled) {
+        for (auto dependency : _result_sink_dependencys) {
+            dependency->set_ready();
         }
-        return;
-    }
-
-    for (auto it : _result_sink_dependencys) {
-        if (_instance_rows[it.first] > _batch_size) {
-            it.second->block();
-        } else {
-            it.second->set_ready();
+    } else if (!_batch_queue_empty && _buffer_rows < _buffer_limit && 
!_is_cancelled) {
+        for (auto dependency : _result_sink_dependencys) {
+            dependency->block();
         }
     }
 }
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 1296f2c606b..c8c240f928a 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -27,16 +27,15 @@
 #include <list>
 #include <memory>
 #include <mutex>
-#include <unordered_map>
 
 #include "common/status.h"
 #include "runtime/query_statistics.h"
-#include "runtime/runtime_state.h"
-#include "util/hash_util.hpp"
 
-namespace google::protobuf {
+namespace google {
+namespace protobuf {
 class Closure;
-} // namespace google::protobuf
+}
+} // namespace google
 
 namespace arrow {
 class RecordBatch;
@@ -72,19 +71,19 @@ struct GetResultBatchCtx {
 // buffer used for result customer and producer
 class BufferControlBlock {
 public:
-    BufferControlBlock(const TUniqueId& id, int buffer_size, int batch_size);
+    BufferControlBlock(const TUniqueId& id, int buffer_size);
     ~BufferControlBlock();
 
     Status init();
-    Status add_batch(RuntimeState* state, std::unique_ptr<TFetchDataResult>& 
result);
-    Status add_arrow_batch(RuntimeState* state, 
std::shared_ptr<arrow::RecordBatch>& result);
+    Status add_batch(std::unique_ptr<TFetchDataResult>& result);
+    Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result);
 
     void get_batch(GetResultBatchCtx* ctx);
     Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result);
 
     // close buffer block, set _status to exec_status and set _is_close to 
true;
     // called because data has been read or error happened.
-    Status close(const TUniqueId& id, Status exec_status);
+    Status close(Status exec_status);
     // this is called by RPC, called from coordinator
     void cancel();
 
@@ -99,8 +98,7 @@ public:
         }
     }
 
-    void set_dependency(const TUniqueId& id,
-                        std::shared_ptr<pipeline::Dependency> 
result_sink_dependency);
+    void set_dependency(std::shared_ptr<pipeline::Dependency> 
result_sink_dependency);
 
 protected:
     void _update_dependency();
@@ -123,17 +121,18 @@ protected:
 
     // protects all subsequent data in this block
     std::mutex _lock;
+    // signal arrival of new batch or the eos/cancelled condition
+    std::condition_variable _data_arrival;
+    // signal removal of data by stream consumer
+    std::condition_variable _data_removal;
 
     std::deque<GetResultBatchCtx*> _waiting_rpc;
 
     // only used for FE using return rows to check limit
     std::unique_ptr<QueryStatistics> _query_statistics;
-    // instance id to dependency
-    std::unordered_map<TUniqueId, std::shared_ptr<pipeline::Dependency>> 
_result_sink_dependencys;
-    std::unordered_map<TUniqueId, size_t> _instance_rows;
-    std::list<std::unordered_map<TUniqueId, size_t>> _instance_rows_in_queue;
-
-    int _batch_size;
+    std::atomic_bool _batch_queue_empty = false;
+    std::vector<std::shared_ptr<pipeline::Dependency>> 
_result_sink_dependencys;
+    size_t close_cnt = 0;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/result_buffer_mgr.cpp 
b/be/src/runtime/result_buffer_mgr.cpp
index ccbf0c3ff67..23f440d1909 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -67,8 +67,8 @@ Status ResultBufferMgr::init() {
 }
 
 Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int 
buffer_size,
-                                      std::shared_ptr<BufferControlBlock>* 
sender, int exec_timout,
-                                      int batch_size) {
+                                      std::shared_ptr<BufferControlBlock>* 
sender,
+                                      int exec_timout) {
     *sender = find_control_block(query_id);
     if (*sender != nullptr) {
         LOG(WARNING) << "already have buffer control block for this instance " 
<< query_id;
@@ -77,7 +77,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& 
query_id, int buffer_size
 
     std::shared_ptr<BufferControlBlock> control_block = nullptr;
 
-    control_block = std::make_shared<BufferControlBlock>(query_id, 
buffer_size, batch_size);
+    control_block = std::make_shared<BufferControlBlock>(query_id, 
buffer_size);
 
     {
         std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
diff --git a/be/src/runtime/result_buffer_mgr.h 
b/be/src/runtime/result_buffer_mgr.h
index 8bac69c23ac..30b1b61eb7d 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -58,8 +58,7 @@ public:
     // the returned sender do not need release
     // sender is not used when call cancel or unregister
     Status create_sender(const TUniqueId& query_id, int buffer_size,
-                         std::shared_ptr<BufferControlBlock>* sender, int 
exec_timeout,
-                         int batch_size);
+                         std::shared_ptr<BufferControlBlock>* sender, int 
exec_timeout);
 
     // fetch data result to FE
     void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx);
diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h
index df1b7a808d9..78082956d0e 100644
--- a/be/src/runtime/result_writer.h
+++ b/be/src/runtime/result_writer.h
@@ -47,7 +47,7 @@ public:
     [[nodiscard]] bool output_object_data() const { return 
_output_object_data; }
 
     // Write is sync, it will do real IO work.
-    virtual Status write(RuntimeState* state, vectorized::Block& block) = 0;
+    virtual Status write(vectorized::Block& block) = 0;
 
     void set_output_object_data(bool output_object_data) {
         _output_object_data = output_object_data;
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index 8078467d5ca..d4d20ea5a48 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -432,7 +432,7 @@ Status PointQueryExecutor::_lookup_row_data() {
                     _reusable->get_col_default_values(), 
_reusable->include_col_uids());
         }
         if (!_reusable->missing_col_uids().empty()) {
-            if 
(!_reusable->runtime_state()->enable_short_circuit_query_access_column_store()) 
{
+            if 
(!_reusable->runtime_state().enable_short_circuit_query_access_column_store()) {
                 std::string missing_columns;
                 for (int cid : _reusable->missing_col_uids()) {
                     missing_columns += 
_tablet->tablet_schema()->column_by_uid(cid).name() + ",";
@@ -487,10 +487,10 @@ Status PointQueryExecutor::_lookup_row_data() {
 }
 
 template <typename MysqlWriter>
-Status serialize_block(RuntimeState* state, MysqlWriter& mysql_writer, 
vectorized::Block& block,
-                       PTabletKeyLookupResponse* response) {
+Status _serialize_block(MysqlWriter& mysql_writer, vectorized::Block& block,
+                        PTabletKeyLookupResponse* response) {
     block.clear_names();
-    RETURN_IF_ERROR(mysql_writer.write(state, block));
+    RETURN_IF_ERROR(mysql_writer.write(block));
     assert(mysql_writer.results().size() == 1);
     uint8_t* buf = nullptr;
     uint32_t len = 0;
@@ -508,13 +508,11 @@ Status PointQueryExecutor::_output_data() {
         if (_binary_row_format) {
             vectorized::VMysqlResultWriter<true> mysql_writer(nullptr, 
_reusable->output_exprs(),
                                                               nullptr);
-            RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(), 
mysql_writer,
-                                            *_result_block, _response));
+            RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, 
_response));
         } else {
             vectorized::VMysqlResultWriter<false> mysql_writer(nullptr, 
_reusable->output_exprs(),
                                                                nullptr);
-            RETURN_IF_ERROR(serialize_block(_reusable->runtime_state(), 
mysql_writer,
-                                            *_result_block, _response));
+            RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, 
_response));
         }
         VLOG_DEBUG << "dump block " << _result_block->dump_data();
     } else {
diff --git a/be/src/service/point_query_executor.h 
b/be/src/service/point_query_executor.h
index f374e094806..1bed53891c3 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -98,7 +98,7 @@ public:
 
     const std::unordered_set<int32_t> include_col_uids() const { return 
_include_col_uids; }
 
-    RuntimeState* runtime_state() { return _runtime_state.get(); }
+    const RuntimeState& runtime_state() const { return *_runtime_state; }
 
 private:
     // caching TupleDescriptor, output_expr, etc...
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp 
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index b23d1668465..d646cf66f34 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -53,7 +53,7 @@ void VArrowFlightResultWriter::_init_profile() {
     _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", 
TUnit::BYTES);
 }
 
-Status VArrowFlightResultWriter::write(RuntimeState* state, Block& 
input_block) {
+Status VArrowFlightResultWriter::write(Block& input_block) {
     SCOPED_TIMER(_append_row_batch_timer);
     Status status = Status::OK();
     if (UNLIKELY(input_block.rows() == 0)) {
@@ -80,7 +80,7 @@ Status VArrowFlightResultWriter::write(RuntimeState* state, 
Block& input_block)
         SCOPED_TIMER(_result_send_timer);
         // If this is a dry run task, no need to send data block
         if (!_is_dry_run) {
-            status = _sinker->add_arrow_batch(state, result);
+            status = _sinker->add_arrow_batch(result);
         }
         if (status.ok()) {
             _written_rows += num_rows;
diff --git a/be/src/vec/sink/varrow_flight_result_writer.h 
b/be/src/vec/sink/varrow_flight_result_writer.h
index ab2578421c8..862b074cb35 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.h
+++ b/be/src/vec/sink/varrow_flight_result_writer.h
@@ -44,7 +44,7 @@ public:
 
     Status init(RuntimeState* state) override;
 
-    Status write(RuntimeState* state, Block& block) override;
+    Status write(Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp 
b/be/src/vec/sink/vmysql_result_writer.cpp
index 45941173d4d..804f50f0fc8 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -102,7 +102,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
 }
 
 template <bool is_binary_format>
-Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& 
input_block) {
+Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) {
     SCOPED_TIMER(_append_row_batch_timer);
     Status status = Status::OK();
     if (UNLIKELY(input_block.rows() == 0)) {
@@ -194,7 +194,7 @@ Status 
VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& i
         // If this is a dry run task, no need to send data block
         if (!_is_dry_run) {
             if (_sinker) {
-                status = _sinker->add_batch(state, result);
+                status = _sinker->add_batch(result);
             } else {
                 _results.push_back(std::move(result));
             }
diff --git a/be/src/vec/sink/vmysql_result_writer.h 
b/be/src/vec/sink/vmysql_result_writer.h
index 306d062a6be..da3cdcf0690 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -47,7 +47,7 @@ public:
 
     Status init(RuntimeState* state) override;
 
-    Status write(RuntimeState* state, Block& block) override;
+    Status write(Block& block) override;
 
     Status close(Status status) override;
 
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 42fd8468e86..814d1b754c4 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -140,7 +140,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
             }
 
             auto block = _get_block_from_queue();
-            auto status = write(state, *block);
+            auto status = write(*block);
             if (!status.ok()) [[unlikely]] {
                 std::unique_lock l(_m);
                 _writer_status.update(status);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index fc8aacdbfa1..2703330406c 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -105,7 +105,7 @@ VIcebergTableWriter::_to_iceberg_partition_columns() {
     return partition_columns;
 }
 
-Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& 
block) {
+Status VIcebergTableWriter::write(vectorized::Block& block) {
     SCOPED_RAW_TIMER(&_send_data_ns);
     if (block.rows() == 0) {
         return Status::OK();
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index e2e582e04ad..35e71d1960f 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -54,7 +54,7 @@ public:
 
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
-    Status write(RuntimeState* state, vectorized::Block& block) override;
+    Status write(vectorized::Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index c897892cbfc..96c4edc82b5 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -194,7 +194,7 @@ std::string VFileResultWriter::_file_format_to_name() {
     }
 }
 
-Status VFileResultWriter::write(RuntimeState* state, Block& block) {
+Status VFileResultWriter::write(Block& block) {
     if (block.rows() == 0) {
         return Status::OK();
     }
@@ -291,8 +291,7 @@ Status VFileResultWriter::_send_result() {
     attach_infos.insert(std::make_pair("URL", file_url));
 
     result->result_batch.__set_attached_infos(attach_infos);
-    RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result),
-                                   "failed to send outfile result");
+    RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send 
outfile result");
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h 
b/be/src/vec/sink/writer/vfile_result_writer.h
index 42753a5e261..44b0695505f 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -60,7 +60,7 @@ public:
 
     VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
-    Status write(RuntimeState* state, Block& block) override;
+    Status write(Block& block) override;
 
     Status close(Status exec_status) override;
 
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp 
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index f90c7134ccd..0e64060eb0b 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -81,7 +81,7 @@ Status VHiveTableWriter::open(RuntimeState* state, 
RuntimeProfile* profile) {
     return Status::OK();
 }
 
-Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
+Status VHiveTableWriter::write(vectorized::Block& block) {
     SCOPED_RAW_TIMER(&_send_data_ns);
 
     if (block.rows() == 0) {
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h 
b/be/src/vec/sink/writer/vhive_table_writer.h
index 6c8b972f280..4989ba443c7 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.h
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -41,13 +41,13 @@ class VHiveTableWriter final : public AsyncResultWriter {
 public:
     VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
-    ~VHiveTableWriter() override = default;
+    ~VHiveTableWriter() = default;
 
     Status init_properties(ObjectPool* pool);
 
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
-    Status write(RuntimeState* state, vectorized::Block& block) override;
+    Status write(vectorized::Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp 
b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
index b7c8d1f78dd..9493bfbf072 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
@@ -60,7 +60,7 @@ VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
                                    const VExprContextSPtrs& output_expr_ctxs)
         : AsyncResultWriter(output_expr_ctxs), 
JdbcConnector(create_connect_param(t_sink)) {}
 
-Status VJdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
+Status VJdbcTableWriter::write(vectorized::Block& block) {
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
     auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h 
b/be/src/vec/sink/writer/vjdbc_table_writer.h
index b8216c3bcd6..a683259c992 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.h
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.h
@@ -44,7 +44,7 @@ public:
         return init_to_write(profile);
     }
 
-    Status write(RuntimeState* state, vectorized::Block& block) override;
+    Status write(vectorized::Block& block) override;
 
     Status finish(RuntimeState* state) override { return 
JdbcConnector::finish_trans(); }
 
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp 
b/be/src/vec/sink/writer/vmysql_table_writer.cpp
index 45afe8ce019..d9ca6d96f99 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp
@@ -109,7 +109,7 @@ Status VMysqlTableWriter::open(RuntimeState* state, 
RuntimeProfile* profile) {
     return Status::OK();
 }
 
-Status VMysqlTableWriter::write(RuntimeState* state, vectorized::Block& block) 
{
+Status VMysqlTableWriter::write(vectorized::Block& block) {
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
     auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h 
b/be/src/vec/sink/writer/vmysql_table_writer.h
index 072885b176b..856d0a21ec5 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.h
+++ b/be/src/vec/sink/writer/vmysql_table_writer.h
@@ -51,7 +51,7 @@ public:
     // connect to mysql server
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
-    Status write(RuntimeState* state, vectorized::Block& block) override;
+    Status write(vectorized::Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp 
b/be/src/vec/sink/writer/vodbc_table_writer.cpp
index c70bdd4ca19..da068c3d677 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp
@@ -45,7 +45,7 @@ VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink& 
t_sink,
                                    const VExprContextSPtrs& output_expr_ctxs)
         : AsyncResultWriter(output_expr_ctxs), 
ODBCConnector(create_connect_param(t_sink)) {}
 
-Status VOdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
+Status VOdbcTableWriter::write(vectorized::Block& block) {
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
     auto num_rows = output_block.rows();
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h 
b/be/src/vec/sink/writer/vodbc_table_writer.h
index fa4dc47b77f..687b5106a8b 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.h
+++ b/be/src/vec/sink/writer/vodbc_table_writer.h
@@ -44,7 +44,7 @@ public:
         return init_to_write(profile);
     }
 
-    Status write(RuntimeState* state, vectorized::Block& block) override;
+    Status write(vectorized::Block& block) override;
 
     Status finish(RuntimeState* state) override { return 
ODBCConnector::finish_trans(); }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index b31796fe724..2151696714b 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1368,7 +1368,7 @@ Status VTabletWriter::_send_new_partition_batch() {
         //  2. deal batched block
         //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
         _row_distribution.clear_batching_stats();
-        RETURN_IF_ERROR(this->write(_state, tmp_block));
+        RETURN_IF_ERROR(this->write(tmp_block));
         _row_distribution._batching_block->set_mutable_columns(
                 tmp_block.mutate_columns()); // Recovery back
         _row_distribution._batching_block->clear_column_data();
@@ -1675,7 +1675,7 @@ void VTabletWriter::_generate_index_channels_payloads(
     }
 }
 
-Status VTabletWriter::write(RuntimeState* state, doris::vectorized::Block& 
input_block) {
+Status VTabletWriter::write(doris::vectorized::Block& input_block) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     Status status = Status::OK();
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index b9fbc4d0873..21d7b1c9f17 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -544,7 +544,7 @@ class VTabletWriter final : public AsyncResultWriter {
 public:
     VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
-    Status write(RuntimeState* state, Block& block) override;
+    Status write(Block& block) override;
 
     Status close(Status) override;
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 9bd154ce212..3c8dede657f 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -373,7 +373,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, 
int64_t partition_id,
     return Status::OK();
 }
 
-Status VTabletWriterV2::write(RuntimeState* state, Block& input_block) {
+Status VTabletWriterV2::write(Block& input_block) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     Status status = Status::OK();
 
@@ -502,7 +502,7 @@ Status VTabletWriterV2::_send_new_partition_batch() {
         //  2. deal batched block
         //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
         _row_distribution.clear_batching_stats();
-        RETURN_IF_ERROR(this->write(_state, tmp_block));
+        RETURN_IF_ERROR(this->write(tmp_block));
         _row_distribution._batching_block->set_mutable_columns(
                 tmp_block.mutate_columns()); // Recovery back
         _row_distribution._batching_block->clear_column_data();
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 363dea54c3b..ff31e1552dd 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -106,7 +106,7 @@ public:
 
     ~VTabletWriterV2() override;
 
-    Status write(RuntimeState* state, Block& block) override;
+    Status write(Block& block) override;
 
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp 
b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
index 97e78f05c54..5ba8af8b81f 100644
--- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
+++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp
@@ -317,7 +317,7 @@ void serialize_and_deserialize_mysql_test() {
     // mysql_writer init
     vectorized::VMysqlResultWriter<false> mysql_writer(nullptr, 
_output_vexpr_ctxs, nullptr);
 
-    Status st = mysql_writer.write(&runtime_stat, block);
+    Status st = mysql_writer.write(block);
     EXPECT_TRUE(st.ok());
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to