This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1-v20220707 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 46efc851fd1113207a8fb0985888cd96972dbac4 Author: minghong <minghong.z...@163.com> AuthorDate: Wed Jul 6 17:52:10 2022 +0800 [hotfix](dev-1.0.1) if pending bytes exceeded, vtableSink wait until pending bytes consumed or task was cancelled (#10644) Too avoid some OOM problem in load operation. --- be/src/common/config.h | 4 ---- be/src/exec/tablet_sink.cpp | 6 ++---- be/src/vec/sink/vtablet_sink.cpp | 24 +++++++----------------- 3 files changed, 9 insertions(+), 25 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 3174b445c2..2f956e3b8d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -720,10 +720,6 @@ CONF_Int32(quick_compaction_max_rows, "1000"); CONF_Int32(quick_compaction_batch_size, "10"); // do compaction min rowsets CONF_Int32(quick_compaction_min_rowsets, "10"); - -//memory limitation for batches in pending queue, default 500M -CONF_Int64(table_sink_pending_bytes_limitation, "524288000"); - } // namespace config } // namespace doris diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index adb44a5843..2d9733e9bf 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -260,8 +260,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { // But there is still some unfinished things, we do mem limit here temporarily. // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). - while (!_cancelled && (_pending_batches_bytes > _max_pending_batches_bytes || _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD)) && - _pending_batches_num > 0) { + while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) { SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); SleepFor(MonoDelta::FromMilliseconds(10)); } @@ -310,8 +309,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) { // But there is still some unfinished things, we do mem limit here temporarily. // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). - while (!_cancelled && (_pending_batches_bytes > _max_pending_batches_bytes || _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD)) && - _pending_batches_num > 0) { + while (!_cancelled && _pending_batches_bytes > _max_pending_batches_bytes) { SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); SleepFor(MonoDelta::FromMilliseconds(10)); } diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index c90e3b65b7..f0239b23c6 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -58,9 +58,8 @@ Status VOlapTableSink::open(RuntimeState* state) { size_t VOlapTableSink::get_pending_bytes() const { size_t mem_consumption = 0; - for (auto& indexChannel : _channels){ + for (auto& indexChannel : _channels) { mem_consumption += indexChannel->get_pending_bytes(); - } return mem_consumption; } @@ -116,20 +115,10 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) if (findTabletMode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { _partition_to_tablet_map.clear(); } - - //if pending bytes is more than table_sink_pending_bytes_limitation, wait at most 1 min - size_t MAX_PENDING_BYTES = config::table_sink_pending_bytes_limitation; - constexpr int max_retry = 120; - int retry = 0; - while (get_pending_bytes() > MAX_PENDING_BYTES && retry++ < max_retry) { - std::this_thread::sleep_for(std::chrono::microseconds(500)); - } - if (get_pending_bytes() > MAX_PENDING_BYTES) { - std::stringstream str; - str << "Load task " << _load_id - << ": pending bytes exceed limit (config::table_sink_pending_bytes_limitation):" - << MAX_PENDING_BYTES; - return Status::MemoryLimitExceeded(str.str()); + + size_t MAX_PENDING_BYTES = _load_mem_limit / 3; + while (get_pending_bytes() > MAX_PENDING_BYTES && !state->is_cancelled()) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); } for (int i = 0; i < num_rows; ++i) { @@ -140,7 +129,8 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) uint32_t tablet_index = 0; block_row = {&block, i}; if (!_vpartition->find_partition(&block_row, &partition)) { - RETURN_IF_ERROR(state->append_error_msg_to_file([]() -> std::string { return ""; }, + RETURN_IF_ERROR(state->append_error_msg_to_file( + []() -> std::string { return ""; }, [&]() -> std::string { fmt::memory_buffer buf; fmt::format_to(buf, "no partition for this tuple. tuple=[]"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org