This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 3269496be1 [bug](NodeChannel) fix OOM caused by pending queue in sink send (#12359) (#12362) 3269496be1 is described below commit 3269496be1cfafccb520ae3c3c17c18ff70dbe56 Author: zhengyu <freeman.zhang1...@gmail.com> AuthorDate: Wed Sep 7 20:49:08 2022 +0800 [bug](NodeChannel) fix OOM caused by pending queue in sink send (#12359) (#12362) Each NodeChannel has its own queue, with size up to 1/20 exec_mem_limit. User will crash into OOM if set exec_mem_limit high. This commit uses fixed number to control the total max memory used by NodeChannels. Signed-off-by: freemandealer <freeman.zhang1...@gmail.com> --- be/src/common/config.h | 3 + be/src/exec/tablet_sink.cpp | 5 +- be/src/exec/tablet_sink.h | 26 ++- be/src/vec/sink/vtablet_sink.cpp | 359 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 378 insertions(+), 15 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 945e9631cd..902459e6d3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -749,6 +749,9 @@ CONF_Int32(quick_compaction_min_rowsets, "10"); // Max waiting time to wait the "plan fragment start" rpc. // If timeout, the fragment will be cancelled. CONF_mInt32(max_fragment_start_wait_time_seconds, "30"); + +// limit the queue of pending batches which will be sent by a single nodechannel +CONF_mInt64(nodechannel_pending_queue_max_bytes, "67108864"); } // namespace config } // namespace doris diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index a600f83e29..021540cecb 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -103,7 +103,6 @@ Status NodeChannel::init(RuntimeState* state) { _rpc_timeout_ms = state->query_options().query_timeout * 1000; _timeout_watch.start(); - _max_pending_batches_bytes = _parent->_load_mem_limit / 20; //TODO: session variable percent _load_info = "load_id=" + print_id(_parent->_load_id) + ", txn_id=" + std::to_string(_parent->_txn_id); @@ -271,6 +270,10 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { //To simplify the add_row logic, postpone adding batch into req until the time of sending req _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); _pending_batches_num++; + VLOG_DEBUG << "OlapTableSink:" << _parent << " NodeChannel:" << this + << " pending_batches_bytes:" << _pending_batches_bytes + << " jobid:" << std::to_string(_state->load_job_id()) + << " tabletid:" << tablet_id << " loadinfo:" << _load_info; } _cur_batch.reset(new RowBatch(*_row_desc, _batch_size, _parent->_mem_tracker.get())); diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 2a258d62da..ac1dfe2c80 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -131,9 +131,7 @@ public: return _packet_in_flight.compare_exchange_strong(value, true); } - void clear_in_flight() { - _packet_in_flight = false; - } + void clear_in_flight() { _packet_in_flight = false; } bool is_packet_in_flight() { return _packet_in_flight; } @@ -198,7 +196,8 @@ public: // 1: running, haven't reach eos. // only allow 1 rpc in flight // plz make sure, this func should be called after open_wait(). - int try_send_and_fetch_status(RuntimeState* state, std::unique_ptr<ThreadPoolToken>& thread_pool_token); + int try_send_and_fetch_status(RuntimeState* state, + std::unique_ptr<ThreadPoolToken>& thread_pool_token); void try_send_batch(RuntimeState* state); @@ -232,9 +231,7 @@ public: _node_info.brpc_port); } - size_t get_pending_bytes() { - return _pending_batches_bytes; - } + size_t get_pending_bytes() { return _pending_batches_bytes; } private: void _cancel_with_msg(const std::string& msg); @@ -279,7 +276,7 @@ private: std::atomic<int> _pending_batches_num {0}; // limit _pending_batches size std::atomic<size_t> _pending_batches_bytes {0}; - size_t _max_pending_batches_bytes {10 * 1024 * 1024}; + size_t _max_pending_batches_bytes {(size_t)config::nodechannel_pending_queue_max_bytes}; std::shared_ptr<PBackendService_Stub> _stub = nullptr; RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr; @@ -321,13 +318,15 @@ public: void add_row(BlockRow& block_row, int64_t tablet_id); - void for_each_node_channel(const std::function<void(const std::shared_ptr<NodeChannel>&)>& func) { + void for_each_node_channel( + const std::function<void(const std::shared_ptr<NodeChannel>&)>& func) { for (auto& it : _node_channels) { func(it.second); } } - void mark_as_failed(int64_t node_id, const std::string& host, const std::string& err, int64_t tablet_id = -1); + void mark_as_failed(int64_t node_id, const std::string& host, const std::string& err, + int64_t tablet_id = -1); Status check_intolerable_failure(); // set error tablet info in runtime state, so that it can be returned to FE. @@ -337,11 +336,12 @@ public: size_t get_pending_bytes() const { size_t mem_consumption = 0; - for (auto& kv: _node_channels){ + for (auto& kv : _node_channels) { mem_consumption += kv.second->get_pending_bytes(); } return mem_consumption; } + private: friend class NodeChannel; friend class VOlapTableSink; @@ -510,9 +510,7 @@ protected: // compute tablet index for every row batch // FIND_TABLET_EVERY_SINK is only used for random distribution info, which indicates that we should // only compute tablet index in the corresponding partition once for the whole time in olap table sink - enum FindTabletMode { - FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK - }; + enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK }; FindTabletMode findTabletMode = FindTabletMode::FIND_TABLET_EVERY_ROW; }; diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index f0239b23c6..846237229a 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -27,6 +27,365 @@ namespace doris { namespace stream_load { +<<<<<<< HEAD +======= +VNodeChannel::VNodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id) + : NodeChannel(parent, index_channel, node_id) { + _is_vectorized = true; +} + +VNodeChannel::~VNodeChannel() { + if (_add_block_closure != nullptr) { + delete _add_block_closure; + _add_block_closure = nullptr; + } + _cur_add_block_request.release_id(); +} + +void VNodeChannel::clear_all_blocks() { + std::lock_guard<std::mutex> lg(_pending_batches_lock); + std::queue<AddBlockReq> empty; + std::swap(_pending_blocks, empty); + _cur_mutable_block.reset(); +} + +// if "_cancelled" is set to true, +// no need to set _cancel_msg because the error will be +// returned directly via "TabletSink::prepare()" method. +Status VNodeChannel::init(RuntimeState* state) { + RETURN_IF_ERROR(NodeChannel::init(state)); + + _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc})); + + // Initialize _cur_add_block_request + _cur_add_block_request.set_allocated_id(&_parent->_load_id); + _cur_add_block_request.set_index_id(_index_channel->_index_id); + _cur_add_block_request.set_sender_id(_parent->_sender_id); + _cur_add_block_request.set_backend_id(_node_id); + _cur_add_block_request.set_eos(false); + + _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, _node_id); + + return Status::OK(); +} + +Status VNodeChannel::open_wait() { + Status status = NodeChannel::open_wait(); + if (!status.ok()) { + return status; + } + + // add block closure + _add_block_closure = ReusableClosure<PTabletWriterAddBlockResult>::create(); + _add_block_closure->addFailedHandler([this](bool is_last_rpc) { + std::lock_guard<std::mutex> l(this->_closed_lock); + if (this->_is_closed) { + // if the node channel is closed, no need to call `mark_as_failed`, + // and notice that _index_channel may already be destroyed. + return; + } + // If rpc failed, mark all tablets on this node channel as failed + _index_channel->mark_as_failed(this->node_id(), this->host(), + _add_block_closure->cntl.ErrorText(), -1); + Status st = _index_channel->check_intolerable_failure(); + if (!st.ok()) { + _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); + } else if (is_last_rpc) { + // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait + // will be blocked. + _add_batches_finished = true; + } + }); + + _add_block_closure->addSuccessHandler([this](const PTabletWriterAddBlockResult& result, + bool is_last_rpc) { + std::lock_guard<std::mutex> l(this->_closed_lock); + if (this->_is_closed) { + // if the node channel is closed, no need to call the following logic, + // and notice that _index_channel may already be destroyed. + return; + } + Status status(result.status()); + if (status.ok()) { + // if has error tablet, handle them first + for (auto& error : result.tablet_errors()) { + _index_channel->mark_as_failed(this->node_id(), this->host(), error.msg(), + error.tablet_id()); + } + + Status st = _index_channel->check_intolerable_failure(); + if (!st.ok()) { + _cancel_with_msg(st.get_error_msg()); + } else if (is_last_rpc) { + for (auto& tablet : result.tablet_vec()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet.tablet_id(); + commit_info.backendId = _node_id; + _tablet_commit_infos.emplace_back(std::move(commit_info)); + VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id() + << ", backendId=" << _node_id + << ", master node id: " << this->node_id() + << ", host: " << this->host() << ", txn_id=" << _parent->_txn_id; + } + if (_parent->_write_single_replica) { + for (auto& tablet_slave_node_ids : result.success_slave_tablet_node_ids()) { + for (auto slave_node_id : tablet_slave_node_ids.second.slave_node_ids()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet_slave_node_ids.first; + commit_info.backendId = slave_node_id; + _tablet_commit_infos.emplace_back(std::move(commit_info)); + VLOG_CRITICAL << "slave replica commit info: tabletId=" + << tablet_slave_node_ids.first + << ", backendId=" << slave_node_id + << ", master node id: " << this->node_id() + << ", host: " << this->host() + << ", txn_id=" << _parent->_txn_id; + } + } + } + _add_batches_finished = true; + } + } else { + _cancel_with_msg(fmt::format("{}, add batch req success but status isn't ok, err: {}", + channel_info(), status.get_error_msg())); + } + + if (result.has_execution_time_us()) { + _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); + _add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us(); + _add_batch_counter.add_batch_num++; + } + }); + return status; +} + +Status VNodeChannel::add_block(vectorized::Block* block, + const std::pair<std::unique_ptr<vectorized::IColumn::Selector>, + std::vector<int64_t>>& payload) { + SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); + // If add_block() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. + auto st = none_of({_cancelled, _eos_is_produced}); + if (!st.ok()) { + if (_cancelled) { + std::lock_guard<SpinLock> l(_cancel_msg_lock); + return Status::InternalError("add row failed. {}", _cancel_msg); + } else { + return std::move(st.prepend("already stopped, can't add row. cancelled/eos: ")); + } + } + + // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, + // so in the ideal case, mem limit is a matter for _plan node. + // But there is still some unfinished things, we do mem limit here temporarily. + // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. + // It's fine to do a fake add_block() and return OK, because we will check _cancelled in next add_block() or mark_close(). + while (!_cancelled && _pending_batches_num > 0 && + _pending_batches_bytes > _max_pending_batches_bytes) { + SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + block->append_block_by_selector(_cur_mutable_block->mutable_columns(), *(payload.first)); + for (auto tablet_id : payload.second) { + _cur_add_block_request.add_tablet_ids(tablet_id); + } + + if (_cur_mutable_block->rows() >= _batch_size || + _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) { + { + SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); + std::lock_guard<std::mutex> l(_pending_batches_lock); + // To simplify the add_row logic, postpone adding block into req until the time of sending req + _pending_batches_bytes += _cur_mutable_block->allocated_bytes(); + _pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request); + _pending_batches_num++; + VLOG_DEBUG << "VOlapTableSink:" << _parent << " VNodeChannel:" << this + << " pending_batches_bytes:" << _pending_batches_bytes + << " jobid:" << std::to_string(_state->load_job_id()) + << " loadinfo:" << _load_info; + } + + _cur_mutable_block.reset(new vectorized::MutableBlock({_tuple_desc})); + _cur_add_block_request.clear_tablet_ids(); + } + + return Status::OK(); +} + +int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, + std::unique_ptr<ThreadPoolToken>& thread_pool_token) { + auto st = none_of({_cancelled, _send_finished}); + if (!st.ok()) { + return 0; + } + + if (!_add_block_closure->try_set_in_flight()) { + return _send_finished ? 0 : 1; + } + + // We are sure that try_send_batch is not running + if (_pending_batches_num > 0) { + auto s = thread_pool_token->submit_func( + std::bind(&VNodeChannel::try_send_block, this, state)); + if (!s.ok()) { + _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed"); + // clear in flight + _add_block_closure->clear_in_flight(); + } + // in_flight is cleared in closure::Run + } else { + // clear in flight + _add_block_closure->clear_in_flight(); + } + return _send_finished ? 0 : 1; +} + +void VNodeChannel::try_send_block(RuntimeState* state) { + SCOPED_ATTACH_TASK(state); + SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); + SCOPED_ATOMIC_TIMER(&_actual_consume_ns); + AddBlockReq send_block; + { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; + std::lock_guard<std::mutex> l(_pending_batches_lock); + DCHECK(!_pending_blocks.empty()); + send_block = std::move(_pending_blocks.front()); + _pending_blocks.pop(); + _pending_batches_num--; + _pending_batches_bytes -= send_block.first->allocated_bytes(); + } + + auto mutable_block = std::move(send_block.first); + auto request = std::move(send_block.second); // doesn't need to be saved in heap + + // tablet_ids has already set when add row + request.set_packet_seq(_next_packet_seq); + auto block = mutable_block->to_block(); + if (block.rows() > 0) { + SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); + size_t uncompressed_bytes = 0, compressed_bytes = 0; + Status st = block.serialize(request.mutable_block(), &uncompressed_bytes, &compressed_bytes, + state->fragement_transmission_compression_type(), + _parent->_transfer_large_data_by_brpc); + if (!st.ok()) { + cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); + _add_block_closure->clear_in_flight(); + return; + } + if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) { + LOG(WARNING) << "send block too large, this rpc may failed. send size: " + << compressed_bytes << ", threshold: " << config::brpc_max_body_size + << ", " << channel_info(); + } + } + + int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / NANOS_PER_MILLIS; + if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) { + if (remain_ms <= 0 && !request.eos()) { + cancel(fmt::format("{}, err: timeout", channel_info())); + _add_block_closure->clear_in_flight(); + return; + } else { + remain_ms = config::min_load_rpc_timeout_ms; + } + } + + _add_block_closure->reset(); + _add_block_closure->cntl.set_timeout_ms(remain_ms); + if (config::tablet_writer_ignore_eovercrowded) { + _add_block_closure->cntl.ignore_eovercrowded(); + } + + if (request.eos()) { + for (auto pid : _parent->_partition_ids) { + request.add_partition_ids(pid); + } + + request.set_write_single_replica(false); + if (_parent->_write_single_replica) { + request.set_write_single_replica(true); + for (std::unordered_map<int64_t, std::vector<int64_t>>::iterator iter = + _slave_tablet_nodes.begin(); + iter != _slave_tablet_nodes.end(); iter++) { + PSlaveTabletNodes slave_tablet_nodes; + for (auto node_id : iter->second) { + auto node = _parent->_nodes_info->find_node(node_id); + if (node == nullptr) { + return; + } + PNodeInfo* pnode = slave_tablet_nodes.add_slave_nodes(); + pnode->set_id(node->id); + pnode->set_option(node->option); + pnode->set_host(node->host); + pnode->set_async_internal_port(node->brpc_port); + } + request.mutable_slave_tablet_nodes()->insert({iter->first, slave_tablet_nodes}); + } + } + + // eos request must be the last request + _add_block_closure->end_mark(); + _send_finished = true; + CHECK(_pending_batches_num == 0) << _pending_batches_num; + } + + if (_parent->_transfer_large_data_by_brpc && request.has_block() && + request.block().has_column_values() && request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { + Status st = request_embed_attachment_contain_block< + PTabletWriterAddBlockRequest, ReusableClosure<PTabletWriterAddBlockResult>>( + &request, _add_block_closure); + if (!st.ok()) { + cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); + _add_block_closure->clear_in_flight(); + return; + } + std::string brpc_url = fmt::format("http://{}:{}", _node_info.host, _node_info.brpc_port); + std::shared_ptr<PBackendService_Stub> _brpc_http_stub = + _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, + "http"); + _add_block_closure->cntl.http_request().uri() = + brpc_url + "/PInternalServiceImpl/tablet_writer_add_block_by_http"; + _add_block_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); + _add_block_closure->cntl.http_request().set_content_type("application/json"); + _brpc_http_stub->tablet_writer_add_block_by_http( + &_add_block_closure->cntl, NULL, &_add_block_closure->result, _add_block_closure); + } else { + _add_block_closure->cntl.http_request().Clear(); + _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request, + &_add_block_closure->result, _add_block_closure); + } + + _next_packet_seq++; +} + +void VNodeChannel::_close_check() { + std::lock_guard<std::mutex> lg(_pending_batches_lock); + CHECK(_pending_blocks.empty()) << name(); + CHECK(_cur_mutable_block == nullptr) << name(); +} + +void VNodeChannel::mark_close() { + auto st = none_of({_cancelled, _eos_is_produced}); + if (!st.ok()) { + return; + } + + _cur_add_block_request.set_eos(true); + { + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; + std::lock_guard<std::mutex> l(_pending_batches_lock); + _pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request); + _pending_batches_num++; + DCHECK(_pending_blocks.back().second.eos()); + _close_time_ms = UnixMillis(); + LOG(INFO) << channel_info() + << " mark closed, left pending batch size: " << _pending_blocks.size(); + } + + _eos_is_produced = true; +} + +>>>>>>> 569ab3055 ([bug](NodeChannel) fix OOM caused by pending queue in sink send (#12359) (#12362)) VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& texprs, Status* status) : OlapTableSink(pool, row_desc, texprs, status) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org