This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 662ea3083f [bugfix](NodeChannel) fix OOM caused by pending queue in sink send 662ea3083f is described below commit 662ea3083f8b88b97b6ea84f52f1833180777350 Author: yiguolei <yiguo...@gmail.com> AuthorDate: Thu Sep 8 08:58:28 2022 +0800 [bugfix](NodeChannel) fix OOM caused by pending queue in sink send --- be/src/vec/sink/vtablet_sink.cpp | 359 --------------------------------------- 1 file changed, 359 deletions(-) diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 846237229a..f0239b23c6 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -27,365 +27,6 @@ namespace doris { namespace stream_load { -<<<<<<< HEAD -======= -VNodeChannel::VNodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id) - : NodeChannel(parent, index_channel, node_id) { - _is_vectorized = true; -} - -VNodeChannel::~VNodeChannel() { - if (_add_block_closure != nullptr) { - delete _add_block_closure; - _add_block_closure = nullptr; - } - _cur_add_block_request.release_id(); -} - -void VNodeChannel::clear_all_blocks() { - std::lock_guard<std::mutex> lg(_pending_batches_lock); - std::queue<AddBlockReq> empty; - std::swap(_pending_blocks, empty); - _cur_mutable_block.reset(); -} - -// if "_cancelled" is set to true, -// no need to set _cancel_msg because the error will be -// returned directly via "TabletSink::prepare()" method. -Status VNodeChannel::init(RuntimeState* state) { - RETURN_IF_ERROR(NodeChannel::init(state)); - - _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc})); - - // Initialize _cur_add_block_request - _cur_add_block_request.set_allocated_id(&_parent->_load_id); - _cur_add_block_request.set_index_id(_index_channel->_index_id); - _cur_add_block_request.set_sender_id(_parent->_sender_id); - _cur_add_block_request.set_backend_id(_node_id); - _cur_add_block_request.set_eos(false); - - _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, _node_id); - - return Status::OK(); -} - -Status VNodeChannel::open_wait() { - Status status = NodeChannel::open_wait(); - if (!status.ok()) { - return status; - } - - // add block closure - _add_block_closure = ReusableClosure<PTabletWriterAddBlockResult>::create(); - _add_block_closure->addFailedHandler([this](bool is_last_rpc) { - std::lock_guard<std::mutex> l(this->_closed_lock); - if (this->_is_closed) { - // if the node channel is closed, no need to call `mark_as_failed`, - // and notice that _index_channel may already be destroyed. - return; - } - // If rpc failed, mark all tablets on this node channel as failed - _index_channel->mark_as_failed(this->node_id(), this->host(), - _add_block_closure->cntl.ErrorText(), -1); - Status st = _index_channel->check_intolerable_failure(); - if (!st.ok()) { - _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); - } else if (is_last_rpc) { - // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait - // will be blocked. - _add_batches_finished = true; - } - }); - - _add_block_closure->addSuccessHandler([this](const PTabletWriterAddBlockResult& result, - bool is_last_rpc) { - std::lock_guard<std::mutex> l(this->_closed_lock); - if (this->_is_closed) { - // if the node channel is closed, no need to call the following logic, - // and notice that _index_channel may already be destroyed. - return; - } - Status status(result.status()); - if (status.ok()) { - // if has error tablet, handle them first - for (auto& error : result.tablet_errors()) { - _index_channel->mark_as_failed(this->node_id(), this->host(), error.msg(), - error.tablet_id()); - } - - Status st = _index_channel->check_intolerable_failure(); - if (!st.ok()) { - _cancel_with_msg(st.get_error_msg()); - } else if (is_last_rpc) { - for (auto& tablet : result.tablet_vec()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet.tablet_id(); - commit_info.backendId = _node_id; - _tablet_commit_infos.emplace_back(std::move(commit_info)); - VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id() - << ", backendId=" << _node_id - << ", master node id: " << this->node_id() - << ", host: " << this->host() << ", txn_id=" << _parent->_txn_id; - } - if (_parent->_write_single_replica) { - for (auto& tablet_slave_node_ids : result.success_slave_tablet_node_ids()) { - for (auto slave_node_id : tablet_slave_node_ids.second.slave_node_ids()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet_slave_node_ids.first; - commit_info.backendId = slave_node_id; - _tablet_commit_infos.emplace_back(std::move(commit_info)); - VLOG_CRITICAL << "slave replica commit info: tabletId=" - << tablet_slave_node_ids.first - << ", backendId=" << slave_node_id - << ", master node id: " << this->node_id() - << ", host: " << this->host() - << ", txn_id=" << _parent->_txn_id; - } - } - } - _add_batches_finished = true; - } - } else { - _cancel_with_msg(fmt::format("{}, add batch req success but status isn't ok, err: {}", - channel_info(), status.get_error_msg())); - } - - if (result.has_execution_time_us()) { - _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); - _add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us(); - _add_batch_counter.add_batch_num++; - } - }); - return status; -} - -Status VNodeChannel::add_block(vectorized::Block* block, - const std::pair<std::unique_ptr<vectorized::IColumn::Selector>, - std::vector<int64_t>>& payload) { - SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - // If add_block() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. - auto st = none_of({_cancelled, _eos_is_produced}); - if (!st.ok()) { - if (_cancelled) { - std::lock_guard<SpinLock> l(_cancel_msg_lock); - return Status::InternalError("add row failed. {}", _cancel_msg); - } else { - return std::move(st.prepend("already stopped, can't add row. cancelled/eos: ")); - } - } - - // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, - // so in the ideal case, mem limit is a matter for _plan node. - // But there is still some unfinished things, we do mem limit here temporarily. - // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. - // It's fine to do a fake add_block() and return OK, because we will check _cancelled in next add_block() or mark_close(). - while (!_cancelled && _pending_batches_num > 0 && - _pending_batches_bytes > _max_pending_batches_bytes) { - SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - - block->append_block_by_selector(_cur_mutable_block->mutable_columns(), *(payload.first)); - for (auto tablet_id : payload.second) { - _cur_add_block_request.add_tablet_ids(tablet_id); - } - - if (_cur_mutable_block->rows() >= _batch_size || - _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) { - { - SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); - std::lock_guard<std::mutex> l(_pending_batches_lock); - // To simplify the add_row logic, postpone adding block into req until the time of sending req - _pending_batches_bytes += _cur_mutable_block->allocated_bytes(); - _pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request); - _pending_batches_num++; - VLOG_DEBUG << "VOlapTableSink:" << _parent << " VNodeChannel:" << this - << " pending_batches_bytes:" << _pending_batches_bytes - << " jobid:" << std::to_string(_state->load_job_id()) - << " loadinfo:" << _load_info; - } - - _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc})); - _cur_add_block_request.clear_tablet_ids(); - } - - return Status::OK(); -} - -int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, - std::unique_ptr<ThreadPoolToken>& thread_pool_token) { - auto st = none_of({_cancelled, _send_finished}); - if (!st.ok()) { - return 0; - } - - if (!_add_block_closure->try_set_in_flight()) { - return _send_finished ? 0 : 1; - } - - // We are sure that try_send_batch is not running - if (_pending_batches_num > 0) { - auto s = thread_pool_token->submit_func( - std::bind(&VNodeChannel::try_send_block, this, state)); - if (!s.ok()) { - _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed"); - // clear in flight - _add_block_closure->clear_in_flight(); - } - // in_flight is cleared in closure::Run - } else { - // clear in flight - _add_block_closure->clear_in_flight(); - } - return _send_finished ? 0 : 1; -} - -void VNodeChannel::try_send_block(RuntimeState* state) { - SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - SCOPED_ATOMIC_TIMER(&_actual_consume_ns); - AddBlockReq send_block; - { - debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; - std::lock_guard<std::mutex> l(_pending_batches_lock); - DCHECK(!_pending_blocks.empty()); - send_block = std::move(_pending_blocks.front()); - _pending_blocks.pop(); - _pending_batches_num--; - _pending_batches_bytes -= send_block.first->allocated_bytes(); - } - - auto mutable_block = std::move(send_block.first); - auto request = std::move(send_block.second); // doesn't need to be saved in heap - - // tablet_ids has already set when add row - request.set_packet_seq(_next_packet_seq); - auto block = mutable_block->to_block(); - if (block.rows() > 0) { - SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); - size_t uncompressed_bytes = 0, compressed_bytes = 0; - Status st = block.serialize(request.mutable_block(), &uncompressed_bytes, &compressed_bytes, - state->fragement_transmission_compression_type(), - _parent->_transfer_large_data_by_brpc); - if (!st.ok()) { - cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); - _add_block_closure->clear_in_flight(); - return; - } - if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) { - LOG(WARNING) << "send block too large, this rpc may failed. send size: " - << compressed_bytes << ", threshold: " << config::brpc_max_body_size - << ", " << channel_info(); - } - } - - int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; - if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { - if (remain_ms <= 0 && !request.eos()) { - cancel(fmt::format("{}, err: timeout", channel_info())); - _add_block_closure->clear_in_flight(); - return; - } else { - remain_ms = config::min_load_rpc_timeout_ms; - } - } - - _add_block_closure->reset(); - _add_block_closure->cntl.set_timeout_ms(remain_ms); - if (config::tablet_writer_ignore_eovercrowded) { - _add_block_closure->cntl.ignore_eovercrowded(); - } - - if (request.eos()) { - for (auto pid : _parent->_partition_ids) { - request.add_partition_ids(pid); - } - - request.set_write_single_replica(false); - if (_parent->_write_single_replica) { - request.set_write_single_replica(true); - for (std::unordered_map<int64_t, std::vector<int64_t>>::iterator iter = - _slave_tablet_nodes.begin(); - iter != _slave_tablet_nodes.end(); iter++) { - PSlaveTabletNodes slave_tablet_nodes; - for (auto node_id : iter->second) { - auto node = _parent->_nodes_info->find_node(node_id); - if (node == nullptr) { - return; - } - PNodeInfo* pnode = slave_tablet_nodes.add_slave_nodes(); - pnode->set_id(node->id); - pnode->set_option(node->option); - pnode->set_host(node->host); - pnode->set_async_internal_port(node->brpc_port); - } - request.mutable_slave_tablet_nodes()->insert({iter->first, slave_tablet_nodes}); - } - } - - // eos request must be the last request - _add_block_closure->end_mark(); - _send_finished = true; - CHECK(_pending_batches_num == 0) << _pending_batches_num; - } - - if (_parent->_transfer_large_data_by_brpc && request.has_block() && - request.block().has_column_values() && request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { - Status st = request_embed_attachment_contain_block< - PTabletWriterAddBlockRequest, ReusableClosure<PTabletWriterAddBlockResult>>( - &request, _add_block_closure); - if (!st.ok()) { - cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); - _add_block_closure->clear_in_flight(); - return; - } - std::string brpc_url = fmt::format("http://{}:{}", _node_info.host, _node_info.brpc_port); - std::shared_ptr<PBackendService_Stub> _brpc_http_stub = - _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, - "http"); - _add_block_closure->cntl.http_request().uri() = - brpc_url + "/PInternalServiceImpl/tablet_writer_add_block_by_http"; - _add_block_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); - _add_block_closure->cntl.http_request().set_content_type("application/json"); - _brpc_http_stub->tablet_writer_add_block_by_http( - &_add_block_closure->cntl, NULL, &_add_block_closure->result, _add_block_closure); - } else { - _add_block_closure->cntl.http_request().Clear(); - _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request, - &_add_block_closure->result, _add_block_closure); - } - - _next_packet_seq++; -} - -void VNodeChannel::_close_check() { - std::lock_guard<std::mutex> lg(_pending_batches_lock); - CHECK(_pending_blocks.empty()) << name(); - CHECK(_cur_mutable_block == nullptr) << name(); -} - -void VNodeChannel::mark_close() { - auto st = none_of({_cancelled, _eos_is_produced}); - if (!st.ok()) { - return; - } - - _cur_add_block_request.set_eos(true); - { - debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; - std::lock_guard<std::mutex> l(_pending_batches_lock); - _pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request); - _pending_batches_num++; - DCHECK(_pending_blocks.back().second.eos()); - _close_time_ms = UnixMillis(); - LOG(INFO) << channel_info() - << " mark closed, left pending batch size: " << _pending_blocks.size(); - } - - _eos_is_produced = true; -} - ->>>>>>> 569ab3055 ([bug](NodeChannel) fix OOM caused by pending queue in sink send (#12359) (#12362)) VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& texprs, Status* status) : OlapTableSink(pool, row_desc, texprs, status) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org