This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository

The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 3269496be1 [bug](NodeChannel) fix OOM caused by pending queue in sink 
send (#12359) (#12362)
3269496be1 is described below

commit 3269496be1cfafccb520ae3c3c17c18ff70dbe56
Author: zhengyu <>
AuthorDate: Wed Sep 7 20:49:08 2022 +0800

    [bug](NodeChannel) fix OOM caused by pending queue in sink send (#12359) 
    Each NodeChannel has its own queue, with size up to 1/20 exec_mem_limit.
    User will crash into OOM if set exec_mem_limit high. This commit uses
    fixed number to control the total max memory used by NodeChannels.
    Signed-off-by: freemandealer <>
 be/src/common/config.h           |   3 +
 be/src/exec/tablet_sink.cpp      |   5 +-
 be/src/exec/tablet_sink.h        |  26 ++-
 be/src/vec/sink/vtablet_sink.cpp | 359 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 378 insertions(+), 15 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 945e9631cd..902459e6d3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -749,6 +749,9 @@ CONF_Int32(quick_compaction_min_rowsets, "10");
 // Max waiting time to wait the "plan fragment start" rpc.
 // If timeout, the fragment will be cancelled.
 CONF_mInt32(max_fragment_start_wait_time_seconds, "30");
+// limit the queue of pending batches which will be sent by a single 
+CONF_mInt64(nodechannel_pending_queue_max_bytes, "67108864");
 } // namespace config
 } // namespace doris
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index a600f83e29..021540cecb 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -103,7 +103,6 @@ Status NodeChannel::init(RuntimeState* state) {
     _rpc_timeout_ms = state->query_options().query_timeout * 1000;
-    _max_pending_batches_bytes = _parent->_load_mem_limit / 20; //TODO: 
session variable percent
     _load_info = "load_id=" + print_id(_parent->_load_id) +
                  ", txn_id=" + std::to_string(_parent->_txn_id);
@@ -271,6 +270,10 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t 
tablet_id) {
             //To simplify the add_row logic, postpone adding batch into req 
until the time of sending req
+            VLOG_DEBUG << "OlapTableSink:" << _parent << " NodeChannel:" << 
+                       << " pending_batches_bytes:" << _pending_batches_bytes
+                       << " jobid:" << std::to_string(_state->load_job_id())
+                       << " tabletid:" << tablet_id << " loadinfo:" << 
         _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, 
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 2a258d62da..ac1dfe2c80 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -131,9 +131,7 @@ public:
         return _packet_in_flight.compare_exchange_strong(value, true);
-    void clear_in_flight() {
-        _packet_in_flight = false;
-    }
+    void clear_in_flight() { _packet_in_flight = false; }
     bool is_packet_in_flight() { return _packet_in_flight; }
@@ -198,7 +196,8 @@ public:
     // 1: running, haven't reach eos.
     // only allow 1 rpc in flight
     // plz make sure, this func should be called after open_wait().
-    int try_send_and_fetch_status(RuntimeState* state, 
std::unique_ptr<ThreadPoolToken>& thread_pool_token);
+    int try_send_and_fetch_status(RuntimeState* state,
+                                  std::unique_ptr<ThreadPoolToken>& 
     void try_send_batch(RuntimeState* state);
@@ -232,9 +231,7 @@ public:
-    size_t get_pending_bytes() {
-        return _pending_batches_bytes;
-    }
+    size_t get_pending_bytes() { return _pending_batches_bytes; }
     void _cancel_with_msg(const std::string& msg);
@@ -279,7 +276,7 @@ private:
     std::atomic<int> _pending_batches_num {0};
     // limit _pending_batches size
     std::atomic<size_t> _pending_batches_bytes {0};
-    size_t _max_pending_batches_bytes {10 * 1024 * 1024};
+    size_t _max_pending_batches_bytes 
     std::shared_ptr<PBackendService_Stub> _stub = nullptr;
     RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr;
@@ -321,13 +318,15 @@ public:
     void add_row(BlockRow& block_row, int64_t tablet_id);
-    void for_each_node_channel(const std::function<void(const 
std::shared_ptr<NodeChannel>&)>& func) {
+    void for_each_node_channel(
+            const std::function<void(const std::shared_ptr<NodeChannel>&)>& 
func) {
         for (auto& it : _node_channels) {
-    void mark_as_failed(int64_t node_id, const std::string& host, const 
std::string& err, int64_t tablet_id = -1);
+    void mark_as_failed(int64_t node_id, const std::string& host, const 
std::string& err,
+                        int64_t tablet_id = -1);
     Status check_intolerable_failure();
     // set error tablet info in runtime state, so that it can be returned to 
@@ -337,11 +336,12 @@ public:
     size_t get_pending_bytes() const {
         size_t mem_consumption = 0;
-        for (auto& kv:  _node_channels){
+        for (auto& kv : _node_channels) {
             mem_consumption += kv.second->get_pending_bytes();
         return mem_consumption;
     friend class NodeChannel;
     friend class VOlapTableSink;
@@ -510,9 +510,7 @@ protected:
     // compute tablet index for every row batch
     // FIND_TABLET_EVERY_SINK is only used for random distribution info, which 
indicates that we should
     // only compute tablet index in the corresponding partition once for the 
whole time in olap table sink
-    enum FindTabletMode {
-    };
     FindTabletMode findTabletMode = FindTabletMode::FIND_TABLET_EVERY_ROW;
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index f0239b23c6..846237229a 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -27,6 +27,365 @@
 namespace doris {
 namespace stream_load {
+<<<<<<< HEAD
+VNodeChannel::VNodeChannel(OlapTableSink* parent, IndexChannel* index_channel, 
int64_t node_id)
+        : NodeChannel(parent, index_channel, node_id) {
+    _is_vectorized = true;
+VNodeChannel::~VNodeChannel() {
+    if (_add_block_closure != nullptr) {
+        delete _add_block_closure;
+        _add_block_closure = nullptr;
+    }
+    _cur_add_block_request.release_id();
+void VNodeChannel::clear_all_blocks() {
+    std::lock_guard<std::mutex> lg(_pending_batches_lock);
+    std::queue<AddBlockReq> empty;
+    std::swap(_pending_blocks, empty);
+    _cur_mutable_block.reset();
+// if "_cancelled" is set to true,
+// no need to set _cancel_msg because the error will be
+// returned directly via "TabletSink::prepare()" method.
+Status VNodeChannel::init(RuntimeState* state) {
+    RETURN_IF_ERROR(NodeChannel::init(state));
+    _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}));
+    // Initialize _cur_add_block_request
+    _cur_add_block_request.set_allocated_id(&_parent->_load_id);
+    _cur_add_block_request.set_index_id(_index_channel->_index_id);
+    _cur_add_block_request.set_sender_id(_parent->_sender_id);
+    _cur_add_block_request.set_backend_id(_node_id);
+    _cur_add_block_request.set_eos(false);
+    _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, 
+    return Status::OK();
+Status VNodeChannel::open_wait() {
+    Status status = NodeChannel::open_wait();
+    if (!status.ok()) {
+        return status;
+    }
+    // add block closure
+    _add_block_closure = 
+    _add_block_closure->addFailedHandler([this](bool is_last_rpc) {
+        std::lock_guard<std::mutex> l(this->_closed_lock);
+        if (this->_is_closed) {
+            // if the node channel is closed, no need to call `mark_as_failed`,
+            // and notice that _index_channel may already be destroyed.
+            return;
+        }
+        // If rpc failed, mark all tablets on this node channel as failed
+        _index_channel->mark_as_failed(this->node_id(), this->host(),
+                                       _add_block_closure->cntl.ErrorText(), 
+        Status st = _index_channel->check_intolerable_failure();
+        if (!st.ok()) {
+            _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), 
+        } else if (is_last_rpc) {
+            // if this is last rpc, will must set _add_batches_finished. 
otherwise, node channel's close_wait
+            // will be blocked.
+            _add_batches_finished = true;
+        }
+    });
+    _add_block_closure->addSuccessHandler([this](const 
PTabletWriterAddBlockResult& result,
+                                                 bool is_last_rpc) {
+        std::lock_guard<std::mutex> l(this->_closed_lock);
+        if (this->_is_closed) {
+            // if the node channel is closed, no need to call the following 
+            // and notice that _index_channel may already be destroyed.
+            return;
+        }
+        Status status(result.status());
+        if (status.ok()) {
+            // if has error tablet, handle them first
+            for (auto& error : result.tablet_errors()) {
+                _index_channel->mark_as_failed(this->node_id(), this->host(), 
+                                               error.tablet_id());
+            }
+            Status st = _index_channel->check_intolerable_failure();
+            if (!st.ok()) {
+                _cancel_with_msg(st.get_error_msg());
+            } else if (is_last_rpc) {
+                for (auto& tablet : result.tablet_vec()) {
+                    TTabletCommitInfo commit_info;
+                    commit_info.tabletId = tablet.tablet_id();
+                    commit_info.backendId = _node_id;
+                    _tablet_commit_infos.emplace_back(std::move(commit_info));
+                    VLOG_CRITICAL << "master replica commit info: tabletId=" 
<< tablet.tablet_id()
+                                  << ", backendId=" << _node_id
+                                  << ", master node id: " << this->node_id()
+                                  << ", host: " << this->host() << ", txn_id=" 
<< _parent->_txn_id;
+                }
+                if (_parent->_write_single_replica) {
+                    for (auto& tablet_slave_node_ids : 
result.success_slave_tablet_node_ids()) {
+                        for (auto slave_node_id : 
tablet_slave_node_ids.second.slave_node_ids()) {
+                            TTabletCommitInfo commit_info;
+                            commit_info.tabletId = tablet_slave_node_ids.first;
+                            commit_info.backendId = slave_node_id;
+                            VLOG_CRITICAL << "slave replica commit info: 
+                                          << tablet_slave_node_ids.first
+                                          << ", backendId=" << slave_node_id
+                                          << ", master node id: " << 
+                                          << ", host: " << this->host()
+                                          << ", txn_id=" << _parent->_txn_id;
+                        }
+                    }
+                }
+                _add_batches_finished = true;
+            }
+        } else {
+            _cancel_with_msg(fmt::format("{}, add batch req success but status 
isn't ok, err: {}",
+                                         channel_info(), 
+        }
+        if (result.has_execution_time_us()) {
+            _add_batch_counter.add_batch_execution_time_us += 
+            _add_batch_counter.add_batch_wait_execution_time_us += 
+            _add_batch_counter.add_batch_num++;
+        }
+    });
+    return status;
+Status VNodeChannel::add_block(vectorized::Block* block,
+                               const 
+                                               std::vector<int64_t>>& payload) 
+    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
+    // If add_block() when _eos_is_produced==true, there must be sth wrong, we 
can only mark this channel as failed.
+    auto st = none_of({_cancelled, _eos_is_produced});
+    if (!st.ok()) {
+        if (_cancelled) {
+            std::lock_guard<SpinLock> l(_cancel_msg_lock);
+            return Status::InternalError("add row failed. {}", _cancel_msg);
+        } else {
+            return std::move(st.prepend("already stopped, can't add row. 
cancelled/eos: "));
+        }
+    }
+    // We use OlapTableSink mem_tracker which has the same ancestor of _plan 
+    // so in the ideal case, mem limit is a matter for _plan node.
+    // But there is still some unfinished things, we do mem limit here 
+    // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
+    // It's fine to do a fake add_block() and return OK, because we will check 
_cancelled in next add_block() or mark_close().
+    while (!_cancelled && _pending_batches_num > 0 &&
+           _pending_batches_bytes > _max_pending_batches_bytes) {
+        SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    block->append_block_by_selector(_cur_mutable_block->mutable_columns(), 
+    for (auto tablet_id : payload.second) {
+        _cur_add_block_request.add_tablet_ids(tablet_id);
+    }
+    if (_cur_mutable_block->rows() >= _batch_size ||
+        _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
+        {
+            SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
+            std::lock_guard<std::mutex> l(_pending_batches_lock);
+            // To simplify the add_row logic, postpone adding block into req 
until the time of sending req
+            _pending_batches_bytes += _cur_mutable_block->allocated_bytes();
+            _pending_blocks.emplace(std::move(_cur_mutable_block), 
+            _pending_batches_num++;
+            VLOG_DEBUG << "VOlapTableSink:" << _parent << " VNodeChannel:" << 
+                       << " pending_batches_bytes:" << _pending_batches_bytes
+                       << " jobid:" << std::to_string(_state->load_job_id())
+                       << " loadinfo:" << _load_info;
+        }
+        _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}));
+        _cur_add_block_request.clear_tablet_ids();
+    }
+    return Status::OK();
+int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
+                                            std::unique_ptr<ThreadPoolToken>& 
thread_pool_token) {
+    auto st = none_of({_cancelled, _send_finished});
+    if (!st.ok()) {
+        return 0;
+    }
+    if (!_add_block_closure->try_set_in_flight()) {
+        return _send_finished ? 0 : 1;
+    }
+    // We are sure that try_send_batch is not running
+    if (_pending_batches_num > 0) {
+        auto s = thread_pool_token->submit_func(
+                std::bind(&VNodeChannel::try_send_block, this, state));
+        if (!s.ok()) {
+            _cancel_with_msg("submit send_batch task to send_batch_thread_pool 
+            // clear in flight
+            _add_block_closure->clear_in_flight();
+        }
+        // in_flight is cleared in closure::Run
+    } else {
+        // clear in flight
+        _add_block_closure->clear_in_flight();
+    }
+    return _send_finished ? 0 : 1;
+void VNodeChannel::try_send_block(RuntimeState* state) {
+    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
+    SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
+    AddBlockReq send_block;
+    {
+        debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+        std::lock_guard<std::mutex> l(_pending_batches_lock);
+        DCHECK(!_pending_blocks.empty());
+        send_block = std::move(_pending_blocks.front());
+        _pending_blocks.pop();
+        _pending_batches_num--;
+        _pending_batches_bytes -= send_block.first->allocated_bytes();
+    }
+    auto mutable_block = std::move(send_block.first);
+    auto request = std::move(send_block.second); // doesn't need to be saved 
in heap
+    // tablet_ids has already set when add row
+    request.set_packet_seq(_next_packet_seq);
+    auto block = mutable_block->to_block();
+    if (block.rows() > 0) {
+        SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
+        size_t uncompressed_bytes = 0, compressed_bytes = 0;
+        Status st = block.serialize(request.mutable_block(), 
&uncompressed_bytes, &compressed_bytes,
+                                    _parent->_transfer_large_data_by_brpc);
+        if (!st.ok()) {
+            cancel(fmt::format("{}, err: {}", channel_info(), 
+            _add_block_closure->clear_in_flight();
+            return;
+        }
+        if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
+            LOG(WARNING) << "send block too large, this rpc may failed. send 
size: "
+                         << compressed_bytes << ", threshold: " << 
+                         << ", " << channel_info();
+        }
+    }
+    int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
+    if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
+        if (remain_ms <= 0 && !request.eos()) {
+            cancel(fmt::format("{}, err: timeout", channel_info()));
+            _add_block_closure->clear_in_flight();
+            return;
+        } else {
+            remain_ms = config::min_load_rpc_timeout_ms;
+        }
+    }
+    _add_block_closure->reset();
+    _add_block_closure->cntl.set_timeout_ms(remain_ms);
+    if (config::tablet_writer_ignore_eovercrowded) {
+        _add_block_closure->cntl.ignore_eovercrowded();
+    }
+    if (request.eos()) {
+        for (auto pid : _parent->_partition_ids) {
+            request.add_partition_ids(pid);
+        }
+        request.set_write_single_replica(false);
+        if (_parent->_write_single_replica) {
+            request.set_write_single_replica(true);
+            for (std::unordered_map<int64_t, std::vector<int64_t>>::iterator 
iter =
+                         _slave_tablet_nodes.begin();
+                 iter != _slave_tablet_nodes.end(); iter++) {
+                PSlaveTabletNodes slave_tablet_nodes;
+                for (auto node_id : iter->second) {
+                    auto node = _parent->_nodes_info->find_node(node_id);
+                    if (node == nullptr) {
+                        return;
+                    }
+                    PNodeInfo* pnode = slave_tablet_nodes.add_slave_nodes();
+                    pnode->set_id(node->id);
+                    pnode->set_option(node->option);
+                    pnode->set_host(node->host);
+                    pnode->set_async_internal_port(node->brpc_port);
+                }
+                request.mutable_slave_tablet_nodes()->insert({iter->first, 
+            }
+        }
+        // eos request must be the last request
+        _add_block_closure->end_mark();
+        _send_finished = true;
+        CHECK(_pending_batches_num == 0) << _pending_batches_num;
+    }
+    if (_parent->_transfer_large_data_by_brpc && request.has_block() &&
+        request.block().has_column_values() && request.ByteSizeLong() > 
+        Status st = request_embed_attachment_contain_block<
+                PTabletWriterAddBlockRequest, 
+                &request, _add_block_closure);
+        if (!st.ok()) {
+            cancel(fmt::format("{}, err: {}", channel_info(), 
+            _add_block_closure->clear_in_flight();
+            return;
+        }
+        std::string brpc_url = fmt::format("http://{}:{}";,, 
+        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
+        _add_block_closure->cntl.http_request().uri() =
+                brpc_url + 
+        _brpc_http_stub->tablet_writer_add_block_by_http(
+                &_add_block_closure->cntl, NULL, &_add_block_closure->result, 
+    } else {
+        _add_block_closure->cntl.http_request().Clear();
+        _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request,
+                                       &_add_block_closure->result, 
+    }
+    _next_packet_seq++;
+void VNodeChannel::_close_check() {
+    std::lock_guard<std::mutex> lg(_pending_batches_lock);
+    CHECK(_pending_blocks.empty()) << name();
+    CHECK(_cur_mutable_block == nullptr) << name();
+void VNodeChannel::mark_close() {
+    auto st = none_of({_cancelled, _eos_is_produced});
+    if (!st.ok()) {
+        return;
+    }
+    _cur_add_block_request.set_eos(true);
+    {
+        debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+        std::lock_guard<std::mutex> l(_pending_batches_lock);
+        _pending_blocks.emplace(std::move(_cur_mutable_block), 
+        _pending_batches_num++;
+        DCHECK(_pending_blocks.back().second.eos());
+        _close_time_ms = UnixMillis();
+        LOG(INFO) << channel_info()
+                  << " mark closed, left pending batch size: " << 
+    }
+    _eos_is_produced = true;
+>>>>>>> 569ab3055 ([bug](NodeChannel) fix OOM caused by pending queue in sink 
send (#12359) (#12362))
 VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
                                const std::vector<TExpr>& texprs, Status* 
         : OlapTableSink(pool, row_desc, texprs, status) {

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to