github-actions[bot] commented on code in PR #44469: URL: https://github.com/apache/doris/pull/44469#discussion_r1855398455
########## be/src/runtime/workload_group/workload_group_manager.cpp: ########## @@ -297,7 +266,587 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { SchemaScannerHelper::insert_int64_value(4, wg->get_local_scan_bytes_per_second(), block); SchemaScannerHelper::insert_int64_value(5, wg->get_remote_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int64_value(6, wg->write_buffer_size(), block); + } +} + +void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, + int64_t reserve_size) { + std::lock_guard<std::mutex> lock(_paused_queries_lock); + DCHECK(query_ctx != nullptr); + auto wg = query_ctx->workload_group(); + auto&& [it, inserted] = _paused_queries_list[wg].emplace( + query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted, + doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, reserve_size); + // Check if this is an invalid reserve, for example, if the reserve size is too large, larger than the query limit + // if hard limit is enabled, then not need enable other queries hard limit. + if (inserted) { + query_ctx->set_memory_sufficient(false); + LOG(INFO) << "Insert one new paused query: " << query_ctx->debug_string() + << ", workload group: " << wg->debug_string(); + } +} + +constexpr int64_t TIMEOUT_IN_QUEUE_LIMIT = 1000L * 60; + +/** + * Strategy 1: A revocable query should not have any running task(PipelineTask). + * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit + * strategy 3: If any query exceed process memlimit, then should clear all caches. + * strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it. + * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: + */ +void WorkloadGroupMgr::handle_paused_queries() { Review Comment: warning: function 'handle_paused_queries' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp void WorkloadGroupMgr::handle_paused_queries() { ^ ``` <details> <summary>Additional context</summary> **be/src/runtime/workload_group/workload_group_manager.cpp:298:** 219 lines including whitespace and comments (threshold 80) ```cpp void WorkloadGroupMgr::handle_paused_queries() { ^ ``` </details> ########## be/src/runtime/workload_group/workload_group_manager.cpp: ########## @@ -297,7 +266,587 @@ SchemaScannerHelper::insert_int64_value(4, wg->get_local_scan_bytes_per_second(), block); SchemaScannerHelper::insert_int64_value(5, wg->get_remote_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int64_value(6, wg->write_buffer_size(), block); + } +} + +void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, + int64_t reserve_size) { + std::lock_guard<std::mutex> lock(_paused_queries_lock); + DCHECK(query_ctx != nullptr); + auto wg = query_ctx->workload_group(); + auto&& [it, inserted] = _paused_queries_list[wg].emplace( + query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted, + doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, reserve_size); + // Check if this is an invalid reserve, for example, if the reserve size is too large, larger than the query limit + // if hard limit is enabled, then not need enable other queries hard limit. + if (inserted) { + query_ctx->set_memory_sufficient(false); + LOG(INFO) << "Insert one new paused query: " << query_ctx->debug_string() + << ", workload group: " << wg->debug_string(); + } +} + +constexpr int64_t TIMEOUT_IN_QUEUE_LIMIT = 1000L * 60; + +/** + * Strategy 1: A revocable query should not have any running task(PipelineTask). + * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit + * strategy 3: If any query exceed process memlimit, then should clear all caches. + * strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it. + * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: + */ +void WorkloadGroupMgr::handle_paused_queries() { + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto& [wg_id, wg] : _workload_groups) { + std::unique_lock<std::mutex> lock(_paused_queries_lock); + if (_paused_queries_list[wg].empty()) { + // Add an empty set to wg that not contains paused queries. + } + } + } + + std::unique_lock<std::mutex> lock(_paused_queries_lock); + bool has_revoked_from_other_group = false; + for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { + auto& queries_list = it->second; + const auto& wg = it->first; + + LOG_EVERY_T(INFO, 120) << "Paused queries count: " << queries_list.size(); + + bool is_low_watermark = false; + bool is_high_watermark = false; + wg->check_mem_used(&is_low_watermark, &is_high_watermark); + + bool has_changed_hard_limit = false; + int64_t flushed_memtable_bytes = 0; + // If the query is paused because its limit exceed the query itself's memlimit, then just spill disk. + // The query's memlimit is set using slot mechanism and its value is set using the user settings, not + // by weighted value. So if reserve failed, then it is actually exceed limit. + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + auto query_ctx = query_it->query_ctx_.lock(); + // The query is finished during in paused list. + if (query_ctx == nullptr) { + query_it = queries_list.erase(query_it); + LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; + continue; + } + if (query_ctx->is_cancelled()) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + + if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { + // Streamload, kafka load, group commit will never have query memory exceeded error because + // their query limit is very large. + bool spill_res = + handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + query_it = queries_list.erase(query_it); + continue; + } + } else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + // Only deal with non overcommit workload group. + if (wg->enable_memory_overcommit()) { + // Soft limit wg will only reserve failed when process limit exceed. But in some corner case, + // when reserve, the wg is hard limit, the query reserve failed, but when this loop run + // the wg is converted to soft limit. + // So that should resume the query. + LOG(WARNING) + << "Query: " << print_id(query_ctx->query_id()) + << " reserve memory failed because exceed workload group memlimit, it " + "should not happen, resume it again. paused reason: " + << query_ctx->paused_reason(); + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + // check if the reserve is too large, if it is too large, + // should set the query's limit only. + // Check the query's reserve with expected limit. + if (query_ctx->expected_mem_limit() < + query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { + query_ctx->set_mem_limit(query_ctx->expected_mem_limit()); + query_ctx->set_memory_sufficient(true); + LOG(INFO) << "Workload group memory reserve failed because " + << query_ctx->debug_string() << " reserve size " + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << " is too large, set hard limit to " + << PrettyPrinter::print_bytes(query_ctx->expected_mem_limit()) + << " and resume running."; + query_it = queries_list.erase(query_it); + continue; + } + if (flushed_memtable_bytes < 0) { + flushed_memtable_bytes = flush_memtable_from_current_group_( + query_ctx, wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + wg->enable_write_buffer_limit(true); + ++query_it; + continue; + } + if (!has_changed_hard_limit) { + update_queries_limit_(wg, true); + has_changed_hard_limit = true; + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " reserve memory(" + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << ") failed due to workload group memory exceed, " + "should set the workload group work in memory insufficent mode, " + "so that other query will reduce their memory. wg: " + << wg->debug_string(); + } + if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::DISABLED) { + // If not enable slot memory policy, then should spill directly + // Maybe there are another query that use too much memory, but we + // not encourage not enable slot memory. + // TODO should kill the query that exceed limit. + bool spill_res = handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), + query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + query_it = queries_list.erase(query_it); + continue; + } + } else { + // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, + // and then set wg's flag, other query may not free memory very quickly. + if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE_LIMIT) { + // set wg's memory to insufficent, then add it back to task scheduler to run. + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " will be resume."; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } else { + ++query_it; + continue; + } + } + } else { + // If wg's memlimit not exceed, but process memory exceed, it means cache or other metadata + // used too much memory. Should clean all cache here. + // 1. Check cache used, if cache is larger than > 0, then just return and wait for it to 0 to release some memory. + if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted > + 0.001 && + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted > + 0.001) { + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = + 0; + doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity(); + LOG(INFO) << "There are some queries need process memory, so that set cache " + "capacity " + "to 0 now"; + } + if (query_it->cache_ratio_ < 0.001) { + // 1. Check if could revoke some memory from memtable + if (flushed_memtable_bytes < 0) { + flushed_memtable_bytes = flush_memtable_from_current_group_( + query_ctx, wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + ++query_it; + continue; + } + // TODO should wait here to check if the process has release revoked_size memory and then continue. + if (!has_revoked_from_other_group) { + int64_t revoked_size = revoke_memory_from_other_group_( + query_ctx, wg->enable_memory_overcommit(), query_it->reserve_size_); + if (revoked_size > 0) { + has_revoked_from_other_group = true; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + // Do not care if the revoked_size > reserve size, and try to run again. + continue; + } else { + bool spill_res = handle_single_query_( + query_ctx, query_it->reserve_size_, query_it->elapsed_time(), + query_ctx->paused_reason()); + if (spill_res) { + query_it = queries_list.erase(query_it); + continue; + } else { + ++query_it; + continue; + } + } + } else { + // If any query is cancelled during process limit stage, should resume other query and + // do not do any check now. + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + } + if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted < + 0.001 && + query_it->cache_ratio_ > 0.001) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " will be resume after cache adjust."; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + ++query_it; + } + } + // Not need waiting flush memtable and below low watermark disable load buffer limit + if (flushed_memtable_bytes <= 0 && !is_low_watermark) { + wg->enable_write_buffer_limit(false); + } + + if (queries_list.empty()) { + it = _paused_queries_list.erase(it); + continue; + } else { + // Finished deal with one workload group, and should deal with next one. + ++it; + } + } +} + +// Return the expected free bytes if memtable could flush +int64_t WorkloadGroupMgr::flush_memtable_from_current_group_( + std::shared_ptr<QueryContext> requestor, WorkloadGroupPtr wg, int64_t need_free_mem) { + // If there are a lot of memtable memory, then wait them flush finished. + MemTableMemoryLimiter* memtable_limiter = + doris::ExecEnv::GetInstance()->memtable_memory_limiter(); + int64_t memtable_active_bytes = 0; + int64_t memtable_queue_bytes = 0; + int64_t memtable_flush_bytes = 0; + memtable_limiter->get_workload_group_memtable_usage( + wg->id(), &memtable_active_bytes, &memtable_queue_bytes, &memtable_flush_bytes); + // TODO: should add a signal in memtable limiter to prevent new batch + // For example, streamload, it will not reserve many memory, but it will occupy many memtable memory. + // TODO: 0.2 should be a workload group properties. For example, the group is optimized for load,then the value + // should be larged, if the group is optimized for query, then the value should be smaller. + int64_t max_wg_memtable_bytes = wg->write_buffer_limit(); + if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes > + max_wg_memtable_bytes) { + // There are many table in flush queue, just waiting them flush finished. + if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) { + LOG_EVERY_T(INFO, 60) << wg->name() + << " load memtable size is: " << memtable_active_bytes << ", " + << memtable_queue_bytes << ", " << memtable_flush_bytes + << ", load buffer limit is: " << max_wg_memtable_bytes + << " wait for flush finished to release more memory"; + return memtable_queue_bytes + memtable_flush_bytes; + } else { + // Flush some memtables(currently written) to flush queue. + memtable_limiter->flush_workload_group_memtables( + wg->id(), memtable_active_bytes - (int64_t)(max_wg_memtable_bytes * 0.6)); + LOG_EVERY_T(INFO, 60) << wg->name() + << " load memtable size is: " << memtable_active_bytes << ", " + << memtable_queue_bytes << ", " << memtable_flush_bytes + << ", flush some active memtable to revoke memory"; + return memtable_queue_bytes + memtable_flush_bytes + memtable_active_bytes - + (int64_t)(max_wg_memtable_bytes * 0.6); + } + } + return 0; +} + +int64_t WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryContext> requestor, + bool hard_limit, int64_t need_free_mem) { + int64_t total_freed_mem = 0; + std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("RevokeMemory"); + // 1. memtable like memory + // 2. query exceed workload group limit + int64_t freed_mem = revoke_overcommited_memory_(requestor, need_free_mem, profile.get()); + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; + } + if (hard_limit) { + freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem - total_freed_mem, + doris::QUERY_MIN_MEMORY, profile.get()); + } else { + freed_mem = cancel_top_query_in_overcommit_group_( + need_free_mem - total_freed_mem, requestor->get_mem_tracker()->consumption(), + profile.get()); + } + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; + } + return total_freed_mem; +} + +// Revoke memory from workload group that exceed it's limit. For example, if the wg's limit is 10g, but used 12g +// then should revoke 2g from the group. +int64_t WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryContext> requestor, + int64_t need_free_mem, + RuntimeProfile* profile) { + int64_t total_freed_mem = 0; + // 1. check memtable usage, and try to free them. + int64_t freed_mem = revoke_memtable_from_overcommited_groups_(need_free_mem, profile); + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; + } + // 2. Cancel top usage query, one by one + using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>; + auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) { + return left.second < right.second; + }; + std::priority_queue<WorkloadGroupMem, std::vector<WorkloadGroupMem>, decltype(cmp)> heap(cmp); + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { + if (requestor->workload_group() != nullptr && + iter->second->id() == requestor->workload_group()->id()) { + continue; + } + heap.emplace(iter->second, iter->second->memory_used()); + } + } + while (!heap.empty() && need_free_mem - total_freed_mem > 0 && !requestor->is_cancelled()) { + auto [wg, sort_mem] = heap.top(); + heap.pop(); + freed_mem = wg->free_overcommited_memory(need_free_mem - total_freed_mem, profile); + total_freed_mem += freed_mem; + } + return total_freed_mem; +} + +// If the memtable is too large, then flush them and wait for finished. +int64_t WorkloadGroupMgr::revoke_memtable_from_overcommited_groups_(int64_t need_free_mem, + RuntimeProfile* profile) { + return 0; +} + +// 1. Sort all memory limiter in all overcommit wg, and cancel the top usage task that with most memory. +// 2. Maybe not valid because it's memory not exceed limit. +int64_t WorkloadGroupMgr::cancel_top_query_in_overcommit_group_(int64_t need_free_mem, + int64_t lower_bound, + RuntimeProfile* profile) { + return 0; +} + +// streamload, kafka routine load, group commit +// insert into select +// select + +// If the query could release some memory, for example, spill disk, then the return value is true. +// If the query could not release memory, then cancel the query, the return value is true. +// If the query is not ready to do these tasks, it means just wait, then return value is false. +bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx, + size_t size_to_reserve, int64_t time_in_queue, + Status paused_reason) { + size_t revocable_size = 0; + size_t memory_usage = 0; + bool has_running_task = false; + const auto query_id = print_id(query_ctx->query_id()); + query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task); + if (has_running_task) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " is paused, but still has running task, skip it."; + return false; + } + + auto revocable_tasks = query_ctx->get_revocable_tasks(); + if (revocable_tasks.empty()) { + if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { + const auto limit = query_ctx->get_mem_limit(); + const auto reserved_size = query_ctx->query_mem_tracker->reserved_consumption(); + // During waiting time, another operator in the query may finished and release + // many memory and we could run. + if ((memory_usage + size_to_reserve) < limit) { + LOG(INFO) << "Query: " << query_id << ", usage(" + << PrettyPrinter::print_bytes(memory_usage) << " + " << size_to_reserve + << ") less than limit(" << PrettyPrinter::print_bytes(limit) + << "), resume it."; + query_ctx->set_memory_sufficient(true); + return true; + } else if (time_in_queue >= TIMEOUT_IN_QUEUE_LIMIT) { + // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic + auto msg1 = fmt::format( + "Query {} reserve memory failed, but could not find memory that could " + "release or spill to disk. Query memory usage: {}, reserved size: {}, " + "limit: {} ,process memory info: {}, wg info: {}.", + query_id, PrettyPrinter::print_bytes(memory_usage), + PrettyPrinter::print_bytes(reserved_size), + PrettyPrinter::print_bytes(query_ctx->get_mem_limit()), + GlobalMemoryArbitrator::process_memory_used_details_str(), + query_ctx->workload_group()->memory_debug_string()); + auto msg2 = msg1 + fmt::format( + " Query Memory Tracker Summary: {}." + " Load Memory Tracker Summary: {}", + MemTrackerLimiter::make_type_trackers_profile_str( + MemTrackerLimiter::Type::QUERY), + MemTrackerLimiter::make_type_trackers_profile_str( + MemTrackerLimiter::Type::LOAD)); + LOG(INFO) << msg2; + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1)); + } else { + return false; + } + } else if (paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + if (!query_ctx->workload_group()->exceed_limit()) { + LOG(INFO) << "Query: " << query_id + << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it."; + query_ctx->set_memory_sufficient(true); + return true; + } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) { + LOG(INFO) << "Query: " << query_id << ", workload group exceeded, info: " + << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( + "The query({}) reserved memory failed because workload group limit " + "exceeded, and there is no cache now. And could not find task to spill. " + "Maybe you should set the workload group's limit to a lower value.", + query_id)); + } else { + return false; + } + } else { + // Should not consider about process memory. For example, the query's limit is 100g, workload + // group's memlimit is 10g, process memory is 20g. The query reserve will always failed in wg + // limit, and process is always have memory, so that it will resume and failed reserve again. + if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) { + LOG(INFO) << "Query: " << query_id + << ", process limit not exceeded now, resume this query" + << ", process memory info: " + << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); + query_ctx->set_memory_sufficient(true); + return true; + } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) { + LOG(INFO) << "Query: " << query_id << ", process limit exceeded, info: " + << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( + "The query({}) reserved memory failed because process limit exceeded, " + "and " + "there is no cache now. And could not find task to spill. Maybe you " + "should " + "set " + "the workload group's limit to a lower value.", + query_id)); + } else { + return false; + } + } + } else { + SCOPED_ATTACH_TASK(query_ctx.get()); + RETURN_IF_ERROR(query_ctx->revoke_memory()); + } + return true; +} + +void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_hard_limit) { + auto wg_mem_limit = wg->memory_limit(); + auto all_query_ctxs = wg->queries(); + bool is_low_watermark = false; + bool is_high_watermark = false; + wg->check_mem_used(&is_low_watermark, &is_high_watermark); + int64_t wg_high_water_mark_limit = Review Comment: warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto] ```suggestion auto wg_high_water_mark_limit = ``` ########## be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp: ########## @@ -479,11 +582,11 @@ Status PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState* Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, Review Comment: warning: function 'sink' has cognitive complexity of 76 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, ^ ``` <details> <summary>Additional context</summary> **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:587:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!local_state._shared_state->_spill_status.ok()) { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:596:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (rows == 0) { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:597:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (eos) { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:602:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (need_to_spill) { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:604:** +1, nesting level increased to 3 ```cpp } else { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:605:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:606:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(_setup_internal_operator(state)); ^ ``` **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:606:** +6, including nesting penalty of 5, nesting level increased to 6 ```cpp RETURN_IF_ERROR(_setup_internal_operator(state)); ^ ``` **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:608:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink_eos", { ^ ``` **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF' ```cpp if (UNLIKELY(config::enable_debug_points)) { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:608:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink_eos", { ^ ``` **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF' ```cpp if (dp) { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:614:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_inner_sink_operator->sink( ^ ``` **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:614:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(_inner_sink_operator->sink( ^ ``` **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:625:** nesting level increased to 3 ```cpp [&](auto& block) { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:626:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (block) { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:636:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (need_to_spill) { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:637:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, rows)); ^ ``` **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:637:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, rows)); ^ ``` **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:638:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (eos) { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:640:** +1, nesting level increased to 2 ```cpp } else if (revocable_mem_size(state) > vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:643:** +1, nesting level increased to 1 ```cpp } else { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:644:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:645:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_setup_internal_operator(state)); ^ ``` **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:645:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_setup_internal_operator(state)); ^ ``` **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:647:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink", { ^ ``` **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF' ```cpp if (UNLIKELY(config::enable_debug_points)) { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:647:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink", { ^ ``` **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF' ```cpp if (dp) { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:653:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_inner_sink_operator->sink( ^ ``` **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:653:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_inner_sink_operator->sink( ^ ``` **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:656:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (eos) { ^ ``` </details> ########## be/src/runtime/workload_group/workload_group.cpp: ########## @@ -215,6 +251,91 @@ void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> m _mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr); } +int64_t WorkloadGroup::free_overcommited_memory(int64_t need_free_mem, RuntimeProfile* profile) { Review Comment: warning: function 'free_overcommited_memory' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp int64_t WorkloadGroup::free_overcommited_memory(int64_t need_free_mem, RuntimeProfile* profile) { ^ ``` <details> <summary>Additional context</summary> **be/src/runtime/workload_group/workload_group.cpp:253:** 83 lines including whitespace and comments (threshold 80) ```cpp int64_t WorkloadGroup::free_overcommited_memory(int64_t need_free_mem, RuntimeProfile* profile) { ^ ``` </details> ########## be/src/vec/exprs/vexpr_context.h: ########## @@ -19,6 +19,8 @@ #include <glog/logging.h> Review Comment: warning: 'glog/logging.h' file not found [clang-diagnostic-error] ```cpp #include <glog/logging.h> ^ ``` ########## be/src/runtime/workload_group/workload_group_manager.cpp: ########## @@ -297,7 +266,587 @@ SchemaScannerHelper::insert_int64_value(4, wg->get_local_scan_bytes_per_second(), block); SchemaScannerHelper::insert_int64_value(5, wg->get_remote_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int64_value(6, wg->write_buffer_size(), block); + } +} + +void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, + int64_t reserve_size) { + std::lock_guard<std::mutex> lock(_paused_queries_lock); + DCHECK(query_ctx != nullptr); + auto wg = query_ctx->workload_group(); + auto&& [it, inserted] = _paused_queries_list[wg].emplace( + query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted, + doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, reserve_size); + // Check if this is an invalid reserve, for example, if the reserve size is too large, larger than the query limit + // if hard limit is enabled, then not need enable other queries hard limit. + if (inserted) { + query_ctx->set_memory_sufficient(false); + LOG(INFO) << "Insert one new paused query: " << query_ctx->debug_string() + << ", workload group: " << wg->debug_string(); + } +} + +constexpr int64_t TIMEOUT_IN_QUEUE_LIMIT = 1000L * 60; + +/** + * Strategy 1: A revocable query should not have any running task(PipelineTask). + * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit + * strategy 3: If any query exceed process memlimit, then should clear all caches. + * strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it. + * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: + */ +void WorkloadGroupMgr::handle_paused_queries() { + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto& [wg_id, wg] : _workload_groups) { + std::unique_lock<std::mutex> lock(_paused_queries_lock); + if (_paused_queries_list[wg].empty()) { + // Add an empty set to wg that not contains paused queries. + } + } + } + + std::unique_lock<std::mutex> lock(_paused_queries_lock); + bool has_revoked_from_other_group = false; + for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { + auto& queries_list = it->second; + const auto& wg = it->first; + + LOG_EVERY_T(INFO, 120) << "Paused queries count: " << queries_list.size(); + + bool is_low_watermark = false; + bool is_high_watermark = false; + wg->check_mem_used(&is_low_watermark, &is_high_watermark); + + bool has_changed_hard_limit = false; + int64_t flushed_memtable_bytes = 0; + // If the query is paused because its limit exceed the query itself's memlimit, then just spill disk. + // The query's memlimit is set using slot mechanism and its value is set using the user settings, not + // by weighted value. So if reserve failed, then it is actually exceed limit. + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + auto query_ctx = query_it->query_ctx_.lock(); + // The query is finished during in paused list. + if (query_ctx == nullptr) { + query_it = queries_list.erase(query_it); + LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; + continue; + } + if (query_ctx->is_cancelled()) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + + if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { + // Streamload, kafka load, group commit will never have query memory exceeded error because + // their query limit is very large. + bool spill_res = + handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + query_it = queries_list.erase(query_it); + continue; + } + } else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + // Only deal with non overcommit workload group. + if (wg->enable_memory_overcommit()) { + // Soft limit wg will only reserve failed when process limit exceed. But in some corner case, + // when reserve, the wg is hard limit, the query reserve failed, but when this loop run + // the wg is converted to soft limit. + // So that should resume the query. + LOG(WARNING) + << "Query: " << print_id(query_ctx->query_id()) + << " reserve memory failed because exceed workload group memlimit, it " + "should not happen, resume it again. paused reason: " + << query_ctx->paused_reason(); + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + // check if the reserve is too large, if it is too large, + // should set the query's limit only. + // Check the query's reserve with expected limit. + if (query_ctx->expected_mem_limit() < + query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { + query_ctx->set_mem_limit(query_ctx->expected_mem_limit()); + query_ctx->set_memory_sufficient(true); + LOG(INFO) << "Workload group memory reserve failed because " + << query_ctx->debug_string() << " reserve size " + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << " is too large, set hard limit to " + << PrettyPrinter::print_bytes(query_ctx->expected_mem_limit()) + << " and resume running."; + query_it = queries_list.erase(query_it); + continue; + } + if (flushed_memtable_bytes < 0) { + flushed_memtable_bytes = flush_memtable_from_current_group_( + query_ctx, wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + wg->enable_write_buffer_limit(true); + ++query_it; + continue; + } + if (!has_changed_hard_limit) { + update_queries_limit_(wg, true); + has_changed_hard_limit = true; + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " reserve memory(" + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << ") failed due to workload group memory exceed, " + "should set the workload group work in memory insufficent mode, " + "so that other query will reduce their memory. wg: " + << wg->debug_string(); + } + if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::DISABLED) { + // If not enable slot memory policy, then should spill directly + // Maybe there are another query that use too much memory, but we + // not encourage not enable slot memory. + // TODO should kill the query that exceed limit. + bool spill_res = handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), + query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + query_it = queries_list.erase(query_it); + continue; + } + } else { + // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, + // and then set wg's flag, other query may not free memory very quickly. + if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE_LIMIT) { + // set wg's memory to insufficent, then add it back to task scheduler to run. + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " will be resume."; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } else { + ++query_it; + continue; + } + } + } else { + // If wg's memlimit not exceed, but process memory exceed, it means cache or other metadata + // used too much memory. Should clean all cache here. + // 1. Check cache used, if cache is larger than > 0, then just return and wait for it to 0 to release some memory. + if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted > + 0.001 && + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted > + 0.001) { + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = + 0; + doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity(); + LOG(INFO) << "There are some queries need process memory, so that set cache " + "capacity " + "to 0 now"; + } + if (query_it->cache_ratio_ < 0.001) { + // 1. Check if could revoke some memory from memtable + if (flushed_memtable_bytes < 0) { + flushed_memtable_bytes = flush_memtable_from_current_group_( + query_ctx, wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + ++query_it; + continue; + } + // TODO should wait here to check if the process has release revoked_size memory and then continue. + if (!has_revoked_from_other_group) { + int64_t revoked_size = revoke_memory_from_other_group_( + query_ctx, wg->enable_memory_overcommit(), query_it->reserve_size_); + if (revoked_size > 0) { + has_revoked_from_other_group = true; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + // Do not care if the revoked_size > reserve size, and try to run again. + continue; + } else { + bool spill_res = handle_single_query_( + query_ctx, query_it->reserve_size_, query_it->elapsed_time(), + query_ctx->paused_reason()); + if (spill_res) { + query_it = queries_list.erase(query_it); + continue; + } else { + ++query_it; + continue; + } + } + } else { + // If any query is cancelled during process limit stage, should resume other query and + // do not do any check now. + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + } + if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted < + 0.001 && + query_it->cache_ratio_ > 0.001) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " will be resume after cache adjust."; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + ++query_it; + } + } + // Not need waiting flush memtable and below low watermark disable load buffer limit + if (flushed_memtable_bytes <= 0 && !is_low_watermark) { + wg->enable_write_buffer_limit(false); + } + + if (queries_list.empty()) { + it = _paused_queries_list.erase(it); + continue; + } else { + // Finished deal with one workload group, and should deal with next one. + ++it; + } + } +} + +// Return the expected free bytes if memtable could flush +int64_t WorkloadGroupMgr::flush_memtable_from_current_group_( + std::shared_ptr<QueryContext> requestor, WorkloadGroupPtr wg, int64_t need_free_mem) { + // If there are a lot of memtable memory, then wait them flush finished. + MemTableMemoryLimiter* memtable_limiter = + doris::ExecEnv::GetInstance()->memtable_memory_limiter(); + int64_t memtable_active_bytes = 0; + int64_t memtable_queue_bytes = 0; + int64_t memtable_flush_bytes = 0; + memtable_limiter->get_workload_group_memtable_usage( + wg->id(), &memtable_active_bytes, &memtable_queue_bytes, &memtable_flush_bytes); + // TODO: should add a signal in memtable limiter to prevent new batch + // For example, streamload, it will not reserve many memory, but it will occupy many memtable memory. + // TODO: 0.2 should be a workload group properties. For example, the group is optimized for load,then the value + // should be larged, if the group is optimized for query, then the value should be smaller. + int64_t max_wg_memtable_bytes = wg->write_buffer_limit(); + if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes > + max_wg_memtable_bytes) { + // There are many table in flush queue, just waiting them flush finished. + if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) { + LOG_EVERY_T(INFO, 60) << wg->name() + << " load memtable size is: " << memtable_active_bytes << ", " + << memtable_queue_bytes << ", " << memtable_flush_bytes + << ", load buffer limit is: " << max_wg_memtable_bytes + << " wait for flush finished to release more memory"; + return memtable_queue_bytes + memtable_flush_bytes; + } else { + // Flush some memtables(currently written) to flush queue. + memtable_limiter->flush_workload_group_memtables( + wg->id(), memtable_active_bytes - (int64_t)(max_wg_memtable_bytes * 0.6)); + LOG_EVERY_T(INFO, 60) << wg->name() + << " load memtable size is: " << memtable_active_bytes << ", " + << memtable_queue_bytes << ", " << memtable_flush_bytes + << ", flush some active memtable to revoke memory"; + return memtable_queue_bytes + memtable_flush_bytes + memtable_active_bytes - + (int64_t)(max_wg_memtable_bytes * 0.6); + } + } + return 0; +} + +int64_t WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryContext> requestor, + bool hard_limit, int64_t need_free_mem) { + int64_t total_freed_mem = 0; + std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("RevokeMemory"); + // 1. memtable like memory + // 2. query exceed workload group limit + int64_t freed_mem = revoke_overcommited_memory_(requestor, need_free_mem, profile.get()); + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; + } + if (hard_limit) { + freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem - total_freed_mem, + doris::QUERY_MIN_MEMORY, profile.get()); + } else { + freed_mem = cancel_top_query_in_overcommit_group_( + need_free_mem - total_freed_mem, requestor->get_mem_tracker()->consumption(), + profile.get()); + } + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; + } + return total_freed_mem; +} + +// Revoke memory from workload group that exceed it's limit. For example, if the wg's limit is 10g, but used 12g +// then should revoke 2g from the group. +int64_t WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryContext> requestor, + int64_t need_free_mem, + RuntimeProfile* profile) { + int64_t total_freed_mem = 0; + // 1. check memtable usage, and try to free them. + int64_t freed_mem = revoke_memtable_from_overcommited_groups_(need_free_mem, profile); + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; + } + // 2. Cancel top usage query, one by one + using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>; + auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) { + return left.second < right.second; + }; + std::priority_queue<WorkloadGroupMem, std::vector<WorkloadGroupMem>, decltype(cmp)> heap(cmp); + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { + if (requestor->workload_group() != nullptr && + iter->second->id() == requestor->workload_group()->id()) { + continue; + } + heap.emplace(iter->second, iter->second->memory_used()); + } + } + while (!heap.empty() && need_free_mem - total_freed_mem > 0 && !requestor->is_cancelled()) { + auto [wg, sort_mem] = heap.top(); + heap.pop(); + freed_mem = wg->free_overcommited_memory(need_free_mem - total_freed_mem, profile); + total_freed_mem += freed_mem; + } + return total_freed_mem; +} + +// If the memtable is too large, then flush them and wait for finished. +int64_t WorkloadGroupMgr::revoke_memtable_from_overcommited_groups_(int64_t need_free_mem, + RuntimeProfile* profile) { + return 0; +} + +// 1. Sort all memory limiter in all overcommit wg, and cancel the top usage task that with most memory. +// 2. Maybe not valid because it's memory not exceed limit. +int64_t WorkloadGroupMgr::cancel_top_query_in_overcommit_group_(int64_t need_free_mem, + int64_t lower_bound, + RuntimeProfile* profile) { + return 0; +} + +// streamload, kafka routine load, group commit +// insert into select +// select + +// If the query could release some memory, for example, spill disk, then the return value is true. +// If the query could not release memory, then cancel the query, the return value is true. +// If the query is not ready to do these tasks, it means just wait, then return value is false. +bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx, Review Comment: warning: function 'handle_single_query_' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx, ^ ``` <details> <summary>Additional context</summary> **be/src/runtime/workload_group/workload_group_manager.cpp:647:** 100 lines including whitespace and comments (threshold 80) ```cpp bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx, ^ ``` </details> ########## be/src/runtime/workload_group/workload_group_manager.cpp: ########## @@ -297,7 +266,587 @@ SchemaScannerHelper::insert_int64_value(4, wg->get_local_scan_bytes_per_second(), block); SchemaScannerHelper::insert_int64_value(5, wg->get_remote_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int64_value(6, wg->write_buffer_size(), block); + } +} + +void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, + int64_t reserve_size) { + std::lock_guard<std::mutex> lock(_paused_queries_lock); + DCHECK(query_ctx != nullptr); + auto wg = query_ctx->workload_group(); + auto&& [it, inserted] = _paused_queries_list[wg].emplace( + query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted, + doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, reserve_size); + // Check if this is an invalid reserve, for example, if the reserve size is too large, larger than the query limit + // if hard limit is enabled, then not need enable other queries hard limit. + if (inserted) { + query_ctx->set_memory_sufficient(false); + LOG(INFO) << "Insert one new paused query: " << query_ctx->debug_string() + << ", workload group: " << wg->debug_string(); + } +} + +constexpr int64_t TIMEOUT_IN_QUEUE_LIMIT = 1000L * 60; + +/** + * Strategy 1: A revocable query should not have any running task(PipelineTask). + * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit + * strategy 3: If any query exceed process memlimit, then should clear all caches. + * strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it. + * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: + */ +void WorkloadGroupMgr::handle_paused_queries() { + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto& [wg_id, wg] : _workload_groups) { + std::unique_lock<std::mutex> lock(_paused_queries_lock); + if (_paused_queries_list[wg].empty()) { + // Add an empty set to wg that not contains paused queries. + } + } + } + + std::unique_lock<std::mutex> lock(_paused_queries_lock); + bool has_revoked_from_other_group = false; + for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { + auto& queries_list = it->second; + const auto& wg = it->first; + + LOG_EVERY_T(INFO, 120) << "Paused queries count: " << queries_list.size(); + + bool is_low_watermark = false; + bool is_high_watermark = false; + wg->check_mem_used(&is_low_watermark, &is_high_watermark); + + bool has_changed_hard_limit = false; + int64_t flushed_memtable_bytes = 0; + // If the query is paused because its limit exceed the query itself's memlimit, then just spill disk. + // The query's memlimit is set using slot mechanism and its value is set using the user settings, not + // by weighted value. So if reserve failed, then it is actually exceed limit. + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + auto query_ctx = query_it->query_ctx_.lock(); + // The query is finished during in paused list. + if (query_ctx == nullptr) { + query_it = queries_list.erase(query_it); + LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; + continue; + } + if (query_ctx->is_cancelled()) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + + if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { + // Streamload, kafka load, group commit will never have query memory exceeded error because + // their query limit is very large. + bool spill_res = + handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + query_it = queries_list.erase(query_it); + continue; + } + } else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + // Only deal with non overcommit workload group. + if (wg->enable_memory_overcommit()) { + // Soft limit wg will only reserve failed when process limit exceed. But in some corner case, + // when reserve, the wg is hard limit, the query reserve failed, but when this loop run + // the wg is converted to soft limit. + // So that should resume the query. + LOG(WARNING) + << "Query: " << print_id(query_ctx->query_id()) + << " reserve memory failed because exceed workload group memlimit, it " + "should not happen, resume it again. paused reason: " + << query_ctx->paused_reason(); + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + // check if the reserve is too large, if it is too large, + // should set the query's limit only. + // Check the query's reserve with expected limit. + if (query_ctx->expected_mem_limit() < + query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { + query_ctx->set_mem_limit(query_ctx->expected_mem_limit()); + query_ctx->set_memory_sufficient(true); + LOG(INFO) << "Workload group memory reserve failed because " + << query_ctx->debug_string() << " reserve size " + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << " is too large, set hard limit to " + << PrettyPrinter::print_bytes(query_ctx->expected_mem_limit()) + << " and resume running."; + query_it = queries_list.erase(query_it); + continue; + } + if (flushed_memtable_bytes < 0) { + flushed_memtable_bytes = flush_memtable_from_current_group_( + query_ctx, wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + wg->enable_write_buffer_limit(true); + ++query_it; + continue; + } + if (!has_changed_hard_limit) { + update_queries_limit_(wg, true); + has_changed_hard_limit = true; + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " reserve memory(" + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << ") failed due to workload group memory exceed, " + "should set the workload group work in memory insufficent mode, " + "so that other query will reduce their memory. wg: " + << wg->debug_string(); + } + if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::DISABLED) { + // If not enable slot memory policy, then should spill directly + // Maybe there are another query that use too much memory, but we + // not encourage not enable slot memory. + // TODO should kill the query that exceed limit. + bool spill_res = handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), + query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + query_it = queries_list.erase(query_it); + continue; + } + } else { + // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, + // and then set wg's flag, other query may not free memory very quickly. + if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE_LIMIT) { + // set wg's memory to insufficent, then add it back to task scheduler to run. + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " will be resume."; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } else { + ++query_it; + continue; + } + } + } else { + // If wg's memlimit not exceed, but process memory exceed, it means cache or other metadata + // used too much memory. Should clean all cache here. + // 1. Check cache used, if cache is larger than > 0, then just return and wait for it to 0 to release some memory. + if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted > + 0.001 && + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted > + 0.001) { + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = + 0; + doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity(); + LOG(INFO) << "There are some queries need process memory, so that set cache " + "capacity " + "to 0 now"; + } + if (query_it->cache_ratio_ < 0.001) { + // 1. Check if could revoke some memory from memtable + if (flushed_memtable_bytes < 0) { + flushed_memtable_bytes = flush_memtable_from_current_group_( + query_ctx, wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + ++query_it; + continue; + } + // TODO should wait here to check if the process has release revoked_size memory and then continue. + if (!has_revoked_from_other_group) { + int64_t revoked_size = revoke_memory_from_other_group_( + query_ctx, wg->enable_memory_overcommit(), query_it->reserve_size_); + if (revoked_size > 0) { + has_revoked_from_other_group = true; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + // Do not care if the revoked_size > reserve size, and try to run again. + continue; + } else { + bool spill_res = handle_single_query_( + query_ctx, query_it->reserve_size_, query_it->elapsed_time(), + query_ctx->paused_reason()); + if (spill_res) { + query_it = queries_list.erase(query_it); + continue; + } else { + ++query_it; + continue; + } + } + } else { + // If any query is cancelled during process limit stage, should resume other query and + // do not do any check now. + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + } + if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted < + 0.001 && + query_it->cache_ratio_ > 0.001) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " will be resume after cache adjust."; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + ++query_it; + } + } + // Not need waiting flush memtable and below low watermark disable load buffer limit + if (flushed_memtable_bytes <= 0 && !is_low_watermark) { + wg->enable_write_buffer_limit(false); + } + + if (queries_list.empty()) { + it = _paused_queries_list.erase(it); + continue; + } else { + // Finished deal with one workload group, and should deal with next one. + ++it; + } + } +} + +// Return the expected free bytes if memtable could flush +int64_t WorkloadGroupMgr::flush_memtable_from_current_group_( + std::shared_ptr<QueryContext> requestor, WorkloadGroupPtr wg, int64_t need_free_mem) { + // If there are a lot of memtable memory, then wait them flush finished. + MemTableMemoryLimiter* memtable_limiter = + doris::ExecEnv::GetInstance()->memtable_memory_limiter(); + int64_t memtable_active_bytes = 0; + int64_t memtable_queue_bytes = 0; + int64_t memtable_flush_bytes = 0; + memtable_limiter->get_workload_group_memtable_usage( + wg->id(), &memtable_active_bytes, &memtable_queue_bytes, &memtable_flush_bytes); + // TODO: should add a signal in memtable limiter to prevent new batch + // For example, streamload, it will not reserve many memory, but it will occupy many memtable memory. + // TODO: 0.2 should be a workload group properties. For example, the group is optimized for load,then the value + // should be larged, if the group is optimized for query, then the value should be smaller. + int64_t max_wg_memtable_bytes = wg->write_buffer_limit(); + if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes > + max_wg_memtable_bytes) { + // There are many table in flush queue, just waiting them flush finished. + if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) { + LOG_EVERY_T(INFO, 60) << wg->name() + << " load memtable size is: " << memtable_active_bytes << ", " + << memtable_queue_bytes << ", " << memtable_flush_bytes + << ", load buffer limit is: " << max_wg_memtable_bytes + << " wait for flush finished to release more memory"; + return memtable_queue_bytes + memtable_flush_bytes; + } else { + // Flush some memtables(currently written) to flush queue. + memtable_limiter->flush_workload_group_memtables( + wg->id(), memtable_active_bytes - (int64_t)(max_wg_memtable_bytes * 0.6)); + LOG_EVERY_T(INFO, 60) << wg->name() + << " load memtable size is: " << memtable_active_bytes << ", " + << memtable_queue_bytes << ", " << memtable_flush_bytes + << ", flush some active memtable to revoke memory"; + return memtable_queue_bytes + memtable_flush_bytes + memtable_active_bytes - + (int64_t)(max_wg_memtable_bytes * 0.6); + } + } + return 0; +} + +int64_t WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryContext> requestor, + bool hard_limit, int64_t need_free_mem) { + int64_t total_freed_mem = 0; + std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("RevokeMemory"); + // 1. memtable like memory + // 2. query exceed workload group limit + int64_t freed_mem = revoke_overcommited_memory_(requestor, need_free_mem, profile.get()); + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; + } + if (hard_limit) { + freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem - total_freed_mem, + doris::QUERY_MIN_MEMORY, profile.get()); + } else { + freed_mem = cancel_top_query_in_overcommit_group_( + need_free_mem - total_freed_mem, requestor->get_mem_tracker()->consumption(), + profile.get()); + } + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; + } + return total_freed_mem; +} + +// Revoke memory from workload group that exceed it's limit. For example, if the wg's limit is 10g, but used 12g +// then should revoke 2g from the group. +int64_t WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryContext> requestor, + int64_t need_free_mem, + RuntimeProfile* profile) { + int64_t total_freed_mem = 0; + // 1. check memtable usage, and try to free them. + int64_t freed_mem = revoke_memtable_from_overcommited_groups_(need_free_mem, profile); + total_freed_mem += freed_mem; + // The revoke process may kill current requestor, so should return now. + if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) { + return total_freed_mem; + } + // 2. Cancel top usage query, one by one + using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>; + auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) { + return left.second < right.second; + }; + std::priority_queue<WorkloadGroupMem, std::vector<WorkloadGroupMem>, decltype(cmp)> heap(cmp); + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { + if (requestor->workload_group() != nullptr && + iter->second->id() == requestor->workload_group()->id()) { + continue; + } + heap.emplace(iter->second, iter->second->memory_used()); + } + } + while (!heap.empty() && need_free_mem - total_freed_mem > 0 && !requestor->is_cancelled()) { + auto [wg, sort_mem] = heap.top(); + heap.pop(); + freed_mem = wg->free_overcommited_memory(need_free_mem - total_freed_mem, profile); + total_freed_mem += freed_mem; + } + return total_freed_mem; +} + +// If the memtable is too large, then flush them and wait for finished. +int64_t WorkloadGroupMgr::revoke_memtable_from_overcommited_groups_(int64_t need_free_mem, + RuntimeProfile* profile) { + return 0; +} + +// 1. Sort all memory limiter in all overcommit wg, and cancel the top usage task that with most memory. +// 2. Maybe not valid because it's memory not exceed limit. +int64_t WorkloadGroupMgr::cancel_top_query_in_overcommit_group_(int64_t need_free_mem, + int64_t lower_bound, + RuntimeProfile* profile) { + return 0; +} + +// streamload, kafka routine load, group commit +// insert into select +// select + +// If the query could release some memory, for example, spill disk, then the return value is true. +// If the query could not release memory, then cancel the query, the return value is true. +// If the query is not ready to do these tasks, it means just wait, then return value is false. +bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx, + size_t size_to_reserve, int64_t time_in_queue, + Status paused_reason) { + size_t revocable_size = 0; + size_t memory_usage = 0; + bool has_running_task = false; + const auto query_id = print_id(query_ctx->query_id()); + query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task); + if (has_running_task) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " is paused, but still has running task, skip it."; + return false; + } + + auto revocable_tasks = query_ctx->get_revocable_tasks(); + if (revocable_tasks.empty()) { + if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { + const auto limit = query_ctx->get_mem_limit(); + const auto reserved_size = query_ctx->query_mem_tracker->reserved_consumption(); + // During waiting time, another operator in the query may finished and release + // many memory and we could run. + if ((memory_usage + size_to_reserve) < limit) { + LOG(INFO) << "Query: " << query_id << ", usage(" + << PrettyPrinter::print_bytes(memory_usage) << " + " << size_to_reserve + << ") less than limit(" << PrettyPrinter::print_bytes(limit) + << "), resume it."; + query_ctx->set_memory_sufficient(true); + return true; + } else if (time_in_queue >= TIMEOUT_IN_QUEUE_LIMIT) { + // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic + auto msg1 = fmt::format( + "Query {} reserve memory failed, but could not find memory that could " + "release or spill to disk. Query memory usage: {}, reserved size: {}, " + "limit: {} ,process memory info: {}, wg info: {}.", + query_id, PrettyPrinter::print_bytes(memory_usage), + PrettyPrinter::print_bytes(reserved_size), + PrettyPrinter::print_bytes(query_ctx->get_mem_limit()), + GlobalMemoryArbitrator::process_memory_used_details_str(), + query_ctx->workload_group()->memory_debug_string()); + auto msg2 = msg1 + fmt::format( + " Query Memory Tracker Summary: {}." + " Load Memory Tracker Summary: {}", + MemTrackerLimiter::make_type_trackers_profile_str( + MemTrackerLimiter::Type::QUERY), + MemTrackerLimiter::make_type_trackers_profile_str( + MemTrackerLimiter::Type::LOAD)); + LOG(INFO) << msg2; + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1)); + } else { + return false; + } + } else if (paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + if (!query_ctx->workload_group()->exceed_limit()) { + LOG(INFO) << "Query: " << query_id + << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it."; + query_ctx->set_memory_sufficient(true); + return true; + } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) { + LOG(INFO) << "Query: " << query_id << ", workload group exceeded, info: " + << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( + "The query({}) reserved memory failed because workload group limit " + "exceeded, and there is no cache now. And could not find task to spill. " + "Maybe you should set the workload group's limit to a lower value.", + query_id)); + } else { + return false; + } + } else { + // Should not consider about process memory. For example, the query's limit is 100g, workload + // group's memlimit is 10g, process memory is 20g. The query reserve will always failed in wg + // limit, and process is always have memory, so that it will resume and failed reserve again. + if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) { + LOG(INFO) << "Query: " << query_id + << ", process limit not exceeded now, resume this query" + << ", process memory info: " + << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); + query_ctx->set_memory_sufficient(true); + return true; + } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) { + LOG(INFO) << "Query: " << query_id << ", process limit exceeded, info: " + << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( + "The query({}) reserved memory failed because process limit exceeded, " + "and " + "there is no cache now. And could not find task to spill. Maybe you " + "should " + "set " + "the workload group's limit to a lower value.", + query_id)); + } else { + return false; + } + } + } else { + SCOPED_ATTACH_TASK(query_ctx.get()); + RETURN_IF_ERROR(query_ctx->revoke_memory()); + } + return true; +} + +void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_hard_limit) { Review Comment: warning: function 'update_queries_limit_' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_hard_limit) { ^ ``` <details> <summary>Additional context</summary> **be/src/runtime/workload_group/workload_group_manager.cpp:751:** 98 lines including whitespace and comments (threshold 80) ```cpp void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_hard_limit) { ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org