xinyiZzz commented on code in PR #47462: URL: https://github.com/apache/doris/pull/47462#discussion_r1964990742
########## be/src/common/daemon.cpp: ########## @@ -547,7 +550,9 @@ void Daemon::cache_adjust_capacity_thread() { doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for( l, std::chrono::milliseconds(100)); } - double adjust_weighted = GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted; + double adjust_weighted = std::min<double>( + GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted, + GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted); Review Comment: 为什么要引入 last_wg_trigger_cache_capacity_adjust_weighted 和 last_affected_cache_capacity_adjust_weighted 呢。 看 workload_group_manager.cpp 中的描述,这套逻辑是为了在 workload group 没有超限,但进程内存不足时触发cache的容量缩减,但目前已有的逻辑会在进程内存到达 56% 的时候就开始逐步缩减cache,所以是因为已有逻辑缩减cache不够快么? 如果是因为不够快,为什么 workload_group_manager.cpp 中只是把 cache 缩减到容量的 0.4 呢 目前这样写很绕,倒不如修改 refresh_cache_capacity 这个方法实现 ########## be/src/olap/memtable_memory_limiter.cpp: ########## @@ -110,7 +111,23 @@ int64_t MemTableMemoryLimiter::_need_flush() { return need_flush - _queue_mem_usage; } -void MemTableMemoryLimiter::handle_memtable_flush() { +void MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPtr wg) { + // It means some query is pending on here to flush memtable and to continue running. + // So that should wait here. + // Wait at most 1s, because this code is not aware cancel flag. If the load task is cancelled + // Should releae memory quickly. + using namespace std::chrono_literals; + int32_t sleep_times = 10; Review Comment: 这里是等待 10s 么,注释里写的最长wait 1s ########## be/src/olap/memtable_memory_limiter.cpp: ########## @@ -110,7 +111,23 @@ int64_t MemTableMemoryLimiter::_need_flush() { return need_flush - _queue_mem_usage; } -void MemTableMemoryLimiter::handle_memtable_flush() { +void MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPtr wg) { + // It means some query is pending on here to flush memtable and to continue running. + // So that should wait here. + // Wait at most 1s, because this code is not aware cancel flag. If the load task is cancelled + // Should releae memory quickly. + using namespace std::chrono_literals; + int32_t sleep_times = 10; + while (wg != nullptr && wg->enable_write_buffer_limit() && wg->exceed_write_buffer_limit() && Review Comment: 如果进程内存没有超限,当前 workload group 的内存也没有超限,只是这个workload group 的 exceed_write_buffer_limit 是 true,那也需要卡住等待 flush 释放内存么。 这相当于在 workload group 内给 memtable 大小设置了一个硬限,这会导致一些导入性能劣化吧,是否软限就可以了 ########## be/src/pipeline/pipeline_task.cpp: ########## @@ -340,36 +370,127 @@ Status PipelineTask::execute(bool* eos) { _block->clear_column_data(_root->row_desc().num_materialized_slots()); auto* block = _block.get(); - auto sink_revocable_mem_size = _sink->revocable_mem_size(_state); - if (should_revoke_memory(_state, sink_revocable_mem_size)) { - RETURN_IF_ERROR(_sink->revoke_memory(_state)); - continue; - } DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", { Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed"); return status; }); + // `_sink->is_finished(_state)` means sink operator should be finished if (_sink->is_finished(_state)) { set_wake_up_and_dep_ready(); } // `_dry_run` means sink operator need no more data *eos = wake_up_early() || _dry_run; - if (!*eos) { + auto workload_group = _state->get_query_ctx()->workload_group(); + if (*eos) { + _pending_block.reset(); + } else if (_pending_block) [[unlikely]] { + LOG(INFO) << "Query: " << print_id(query_id) + << " has pending block, size: " << _pending_block->allocated_bytes(); + _block = std::move(_pending_block); + block = _block.get(); + *eos = _pending_eos; + } else { SCOPED_TIMER(_get_block_timer); + if (_state->low_memory_mode()) { + _sink->set_low_memory_mode(_state); + _root->set_low_memory_mode(_state); + } + DEFER_RELEASE_RESERVED(); _get_block_counter->update(1); - RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos)); - } + const auto reserve_size = _root->get_reserve_mem_size(_state); Review Comment: 请教下 PipelineTask 里这个 _root 会做哪些操作呢,可以举个例子说明下,为什么也需要 get_reserve_mem_size ########## be/src/olap/memtable_memory_limiter.cpp: ########## @@ -243,8 +306,12 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() { if (active_usage > 0) { _active_writers.push_back(writer); } - _flush_mem_usage += writer->mem_consumption(MemType::FLUSH); - _queue_mem_usage += writer->mem_consumption(MemType::WRITE_FINISHED); + + auto flush_usage = writer->mem_consumption(MemType::FLUSH); Review Comment: 这里增加一个临时变量,有什么深意么 ########## be/src/olap/memtable_flush_executor.cpp: ########## @@ -137,6 +139,39 @@ Status FlushToken::wait() { return Status::OK(); } +Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context, + int64_t size) { + auto* thread_context = doris::thread_context(); + auto* memtable_flush_executor = + ExecEnv::GetInstance()->storage_engine().memtable_flush_executor(); + Status st; + do { + // only try to reserve process memory + st = thread_context->try_reserve_process_memory(size); + if (st.ok()) { + memtable_flush_executor->inc_flushing_task(); + break; + } + if (_is_shutdown() || + resource_context->memory_context()->mem_tracker()->is_query_cancelled()) { + st = Status::Cancelled("flush memtable already cancelled"); + break; + } + // Make sure at least one memtable is flushing even reserve memory failed. + if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) { + // If there are already any flushing task, Wait for some time and retry. + LOG_EVERY_T(INFO, 60) << fmt::format( + "Failed to reserve memory {} for flush memtable, retry after 100ms", + PrettyPrinter::print_bytes(size)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } else { + st = Status::OK(); Review Comment: 如果 try_reserve_process_memory 失败,且当前没有 memtable 正在下刷,就直接返回 OK 意思是不管内存是否满足,都至少允许一个 memtable 可以执行下刷对吧 ########## be/src/olap/memtable_memory_limiter.cpp: ########## @@ -133,30 +150,69 @@ void MemTableMemoryLimiter::handle_memtable_flush() { if (need_flush > 0) { auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT; LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft") - << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", " Review Comment: 这个日期预期会打印很多,可以考虑是否用 LOG_EVERY_N -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org