This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 662ea3083f [bugfix](NodeChannel) fix OOM caused by pending queue in 
sink send
662ea3083f is described below

commit 662ea3083f8b88b97b6ea84f52f1833180777350
Author: yiguolei <yiguo...@gmail.com>
AuthorDate: Thu Sep 8 08:58:28 2022 +0800

    [bugfix](NodeChannel) fix OOM caused by pending queue in sink send
---
 be/src/vec/sink/vtablet_sink.cpp | 359 ---------------------------------------
 1 file changed, 359 deletions(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 846237229a..f0239b23c6 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -27,365 +27,6 @@
 namespace doris {
 namespace stream_load {
 
-<<<<<<< HEAD
-=======
-VNodeChannel::VNodeChannel(OlapTableSink* parent, IndexChannel* index_channel, 
int64_t node_id)
-        : NodeChannel(parent, index_channel, node_id) {
-    _is_vectorized = true;
-}
-
-VNodeChannel::~VNodeChannel() {
-    if (_add_block_closure != nullptr) {
-        delete _add_block_closure;
-        _add_block_closure = nullptr;
-    }
-    _cur_add_block_request.release_id();
-}
-
-void VNodeChannel::clear_all_blocks() {
-    std::lock_guard<std::mutex> lg(_pending_batches_lock);
-    std::queue<AddBlockReq> empty;
-    std::swap(_pending_blocks, empty);
-    _cur_mutable_block.reset();
-}
-
-// if "_cancelled" is set to true,
-// no need to set _cancel_msg because the error will be
-// returned directly via "TabletSink::prepare()" method.
-Status VNodeChannel::init(RuntimeState* state) {
-    RETURN_IF_ERROR(NodeChannel::init(state));
-
-    _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}));
-
-    // Initialize _cur_add_block_request
-    _cur_add_block_request.set_allocated_id(&_parent->_load_id);
-    _cur_add_block_request.set_index_id(_index_channel->_index_id);
-    _cur_add_block_request.set_sender_id(_parent->_sender_id);
-    _cur_add_block_request.set_backend_id(_node_id);
-    _cur_add_block_request.set_eos(false);
-
-    _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, 
_node_id);
-
-    return Status::OK();
-}
-
-Status VNodeChannel::open_wait() {
-    Status status = NodeChannel::open_wait();
-    if (!status.ok()) {
-        return status;
-    }
-
-    // add block closure
-    _add_block_closure = 
ReusableClosure<PTabletWriterAddBlockResult>::create();
-    _add_block_closure->addFailedHandler([this](bool is_last_rpc) {
-        std::lock_guard<std::mutex> l(this->_closed_lock);
-        if (this->_is_closed) {
-            // if the node channel is closed, no need to call `mark_as_failed`,
-            // and notice that _index_channel may already be destroyed.
-            return;
-        }
-        // If rpc failed, mark all tablets on this node channel as failed
-        _index_channel->mark_as_failed(this->node_id(), this->host(),
-                                       _add_block_closure->cntl.ErrorText(), 
-1);
-        Status st = _index_channel->check_intolerable_failure();
-        if (!st.ok()) {
-            _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), 
st.get_error_msg()));
-        } else if (is_last_rpc) {
-            // if this is last rpc, will must set _add_batches_finished. 
otherwise, node channel's close_wait
-            // will be blocked.
-            _add_batches_finished = true;
-        }
-    });
-
-    _add_block_closure->addSuccessHandler([this](const 
PTabletWriterAddBlockResult& result,
-                                                 bool is_last_rpc) {
-        std::lock_guard<std::mutex> l(this->_closed_lock);
-        if (this->_is_closed) {
-            // if the node channel is closed, no need to call the following 
logic,
-            // and notice that _index_channel may already be destroyed.
-            return;
-        }
-        Status status(result.status());
-        if (status.ok()) {
-            // if has error tablet, handle them first
-            for (auto& error : result.tablet_errors()) {
-                _index_channel->mark_as_failed(this->node_id(), this->host(), 
error.msg(),
-                                               error.tablet_id());
-            }
-
-            Status st = _index_channel->check_intolerable_failure();
-            if (!st.ok()) {
-                _cancel_with_msg(st.get_error_msg());
-            } else if (is_last_rpc) {
-                for (auto& tablet : result.tablet_vec()) {
-                    TTabletCommitInfo commit_info;
-                    commit_info.tabletId = tablet.tablet_id();
-                    commit_info.backendId = _node_id;
-                    _tablet_commit_infos.emplace_back(std::move(commit_info));
-                    VLOG_CRITICAL << "master replica commit info: tabletId=" 
<< tablet.tablet_id()
-                                  << ", backendId=" << _node_id
-                                  << ", master node id: " << this->node_id()
-                                  << ", host: " << this->host() << ", txn_id=" 
<< _parent->_txn_id;
-                }
-                if (_parent->_write_single_replica) {
-                    for (auto& tablet_slave_node_ids : 
result.success_slave_tablet_node_ids()) {
-                        for (auto slave_node_id : 
tablet_slave_node_ids.second.slave_node_ids()) {
-                            TTabletCommitInfo commit_info;
-                            commit_info.tabletId = tablet_slave_node_ids.first;
-                            commit_info.backendId = slave_node_id;
-                            
_tablet_commit_infos.emplace_back(std::move(commit_info));
-                            VLOG_CRITICAL << "slave replica commit info: 
tabletId="
-                                          << tablet_slave_node_ids.first
-                                          << ", backendId=" << slave_node_id
-                                          << ", master node id: " << 
this->node_id()
-                                          << ", host: " << this->host()
-                                          << ", txn_id=" << _parent->_txn_id;
-                        }
-                    }
-                }
-                _add_batches_finished = true;
-            }
-        } else {
-            _cancel_with_msg(fmt::format("{}, add batch req success but status 
isn't ok, err: {}",
-                                         channel_info(), 
status.get_error_msg()));
-        }
-
-        if (result.has_execution_time_us()) {
-            _add_batch_counter.add_batch_execution_time_us += 
result.execution_time_us();
-            _add_batch_counter.add_batch_wait_execution_time_us += 
result.wait_execution_time_us();
-            _add_batch_counter.add_batch_num++;
-        }
-    });
-    return status;
-}
-
-Status VNodeChannel::add_block(vectorized::Block* block,
-                               const 
std::pair<std::unique_ptr<vectorized::IColumn::Selector>,
-                                               std::vector<int64_t>>& payload) 
{
-    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
-    // If add_block() when _eos_is_produced==true, there must be sth wrong, we 
can only mark this channel as failed.
-    auto st = none_of({_cancelled, _eos_is_produced});
-    if (!st.ok()) {
-        if (_cancelled) {
-            std::lock_guard<SpinLock> l(_cancel_msg_lock);
-            return Status::InternalError("add row failed. {}", _cancel_msg);
-        } else {
-            return std::move(st.prepend("already stopped, can't add row. 
cancelled/eos: "));
-        }
-    }
-
-    // We use OlapTableSink mem_tracker which has the same ancestor of _plan 
node,
-    // so in the ideal case, mem limit is a matter for _plan node.
-    // But there is still some unfinished things, we do mem limit here 
temporarily.
-    // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
-    // It's fine to do a fake add_block() and return OK, because we will check 
_cancelled in next add_block() or mark_close().
-    while (!_cancelled && _pending_batches_num > 0 &&
-           _pending_batches_bytes > _max_pending_batches_bytes) {
-        SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
-        std::this_thread::sleep_for(std::chrono::milliseconds(10));
-    }
-
-    block->append_block_by_selector(_cur_mutable_block->mutable_columns(), 
*(payload.first));
-    for (auto tablet_id : payload.second) {
-        _cur_add_block_request.add_tablet_ids(tablet_id);
-    }
-
-    if (_cur_mutable_block->rows() >= _batch_size ||
-        _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
-        {
-            SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
-            std::lock_guard<std::mutex> l(_pending_batches_lock);
-            // To simplify the add_row logic, postpone adding block into req 
until the time of sending req
-            _pending_batches_bytes += _cur_mutable_block->allocated_bytes();
-            _pending_blocks.emplace(std::move(_cur_mutable_block), 
_cur_add_block_request);
-            _pending_batches_num++;
-            VLOG_DEBUG << "VOlapTableSink:" << _parent << " VNodeChannel:" << 
this
-                       << " pending_batches_bytes:" << _pending_batches_bytes
-                       << " jobid:" << std::to_string(_state->load_job_id())
-                       << " loadinfo:" << _load_info;
-        }
-
-        _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc}));
-        _cur_add_block_request.clear_tablet_ids();
-    }
-
-    return Status::OK();
-}
-
-int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
-                                            std::unique_ptr<ThreadPoolToken>& 
thread_pool_token) {
-    auto st = none_of({_cancelled, _send_finished});
-    if (!st.ok()) {
-        return 0;
-    }
-
-    if (!_add_block_closure->try_set_in_flight()) {
-        return _send_finished ? 0 : 1;
-    }
-
-    // We are sure that try_send_batch is not running
-    if (_pending_batches_num > 0) {
-        auto s = thread_pool_token->submit_func(
-                std::bind(&VNodeChannel::try_send_block, this, state));
-        if (!s.ok()) {
-            _cancel_with_msg("submit send_batch task to send_batch_thread_pool 
failed");
-            // clear in flight
-            _add_block_closure->clear_in_flight();
-        }
-        // in_flight is cleared in closure::Run
-    } else {
-        // clear in flight
-        _add_block_closure->clear_in_flight();
-    }
-    return _send_finished ? 0 : 1;
-}
-
-void VNodeChannel::try_send_block(RuntimeState* state) {
-    SCOPED_ATTACH_TASK(state);
-    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
-    SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
-    AddBlockReq send_block;
-    {
-        debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
-        std::lock_guard<std::mutex> l(_pending_batches_lock);
-        DCHECK(!_pending_blocks.empty());
-        send_block = std::move(_pending_blocks.front());
-        _pending_blocks.pop();
-        _pending_batches_num--;
-        _pending_batches_bytes -= send_block.first->allocated_bytes();
-    }
-
-    auto mutable_block = std::move(send_block.first);
-    auto request = std::move(send_block.second); // doesn't need to be saved 
in heap
-
-    // tablet_ids has already set when add row
-    request.set_packet_seq(_next_packet_seq);
-    auto block = mutable_block->to_block();
-    if (block.rows() > 0) {
-        SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
-        size_t uncompressed_bytes = 0, compressed_bytes = 0;
-        Status st = block.serialize(request.mutable_block(), 
&uncompressed_bytes, &compressed_bytes,
-                                    
state->fragement_transmission_compression_type(),
-                                    _parent->_transfer_large_data_by_brpc);
-        if (!st.ok()) {
-            cancel(fmt::format("{}, err: {}", channel_info(), 
st.get_error_msg()));
-            _add_block_closure->clear_in_flight();
-            return;
-        }
-        if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
-            LOG(WARNING) << "send block too large, this rpc may failed. send 
size: "
-                         << compressed_bytes << ", threshold: " << 
config::brpc_max_body_size
-                         << ", " << channel_info();
-        }
-    }
-
-    int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
NANOS_PER_MILLIS;
-    if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
-        if (remain_ms <= 0 && !request.eos()) {
-            cancel(fmt::format("{}, err: timeout", channel_info()));
-            _add_block_closure->clear_in_flight();
-            return;
-        } else {
-            remain_ms = config::min_load_rpc_timeout_ms;
-        }
-    }
-
-    _add_block_closure->reset();
-    _add_block_closure->cntl.set_timeout_ms(remain_ms);
-    if (config::tablet_writer_ignore_eovercrowded) {
-        _add_block_closure->cntl.ignore_eovercrowded();
-    }
-
-    if (request.eos()) {
-        for (auto pid : _parent->_partition_ids) {
-            request.add_partition_ids(pid);
-        }
-
-        request.set_write_single_replica(false);
-        if (_parent->_write_single_replica) {
-            request.set_write_single_replica(true);
-            for (std::unordered_map<int64_t, std::vector<int64_t>>::iterator 
iter =
-                         _slave_tablet_nodes.begin();
-                 iter != _slave_tablet_nodes.end(); iter++) {
-                PSlaveTabletNodes slave_tablet_nodes;
-                for (auto node_id : iter->second) {
-                    auto node = _parent->_nodes_info->find_node(node_id);
-                    if (node == nullptr) {
-                        return;
-                    }
-                    PNodeInfo* pnode = slave_tablet_nodes.add_slave_nodes();
-                    pnode->set_id(node->id);
-                    pnode->set_option(node->option);
-                    pnode->set_host(node->host);
-                    pnode->set_async_internal_port(node->brpc_port);
-                }
-                request.mutable_slave_tablet_nodes()->insert({iter->first, 
slave_tablet_nodes});
-            }
-        }
-
-        // eos request must be the last request
-        _add_block_closure->end_mark();
-        _send_finished = true;
-        CHECK(_pending_batches_num == 0) << _pending_batches_num;
-    }
-
-    if (_parent->_transfer_large_data_by_brpc && request.has_block() &&
-        request.block().has_column_values() && request.ByteSizeLong() > 
MIN_HTTP_BRPC_SIZE) {
-        Status st = request_embed_attachment_contain_block<
-                PTabletWriterAddBlockRequest, 
ReusableClosure<PTabletWriterAddBlockResult>>(
-                &request, _add_block_closure);
-        if (!st.ok()) {
-            cancel(fmt::format("{}, err: {}", channel_info(), 
st.get_error_msg()));
-            _add_block_closure->clear_in_flight();
-            return;
-        }
-        std::string brpc_url = fmt::format("http://{}:{}";, _node_info.host, 
_node_info.brpc_port);
-        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
-                
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
-                                                                               
           "http");
-        _add_block_closure->cntl.http_request().uri() =
-                brpc_url + 
"/PInternalServiceImpl/tablet_writer_add_block_by_http";
-        
_add_block_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
-        
_add_block_closure->cntl.http_request().set_content_type("application/json");
-        _brpc_http_stub->tablet_writer_add_block_by_http(
-                &_add_block_closure->cntl, NULL, &_add_block_closure->result, 
_add_block_closure);
-    } else {
-        _add_block_closure->cntl.http_request().Clear();
-        _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request,
-                                       &_add_block_closure->result, 
_add_block_closure);
-    }
-
-    _next_packet_seq++;
-}
-
-void VNodeChannel::_close_check() {
-    std::lock_guard<std::mutex> lg(_pending_batches_lock);
-    CHECK(_pending_blocks.empty()) << name();
-    CHECK(_cur_mutable_block == nullptr) << name();
-}
-
-void VNodeChannel::mark_close() {
-    auto st = none_of({_cancelled, _eos_is_produced});
-    if (!st.ok()) {
-        return;
-    }
-
-    _cur_add_block_request.set_eos(true);
-    {
-        debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
-        std::lock_guard<std::mutex> l(_pending_batches_lock);
-        _pending_blocks.emplace(std::move(_cur_mutable_block), 
_cur_add_block_request);
-        _pending_batches_num++;
-        DCHECK(_pending_blocks.back().second.eos());
-        _close_time_ms = UnixMillis();
-        LOG(INFO) << channel_info()
-                  << " mark closed, left pending batch size: " << 
_pending_blocks.size();
-    }
-
-    _eos_is_produced = true;
-}
-
->>>>>>> 569ab3055 ([bug](NodeChannel) fix OOM caused by pending queue in sink 
send (#12359) (#12362))
 VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
                                const std::vector<TExpr>& texprs, Status* 
status)
         : OlapTableSink(pool, row_desc, texprs, status) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to