xinyiZzz commented on code in PR #47462: URL: https://github.com/apache/doris/pull/47462#discussion_r1967579743
########## be/src/runtime/workload_group/workload_group_manager.cpp: ########## @@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() { } } +void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, + int64_t reserve_size, const Status& status) { + DCHECK(query_ctx != nullptr); + query_ctx->update_paused_reason(status); + query_ctx->set_low_memory_mode(); + query_ctx->set_memory_sufficient(false); + std::lock_guard<std::mutex> lock(_paused_queries_lock); + auto wg = query_ctx->workload_group(); + auto&& [it, inserted] = _paused_queries_list[wg].emplace( + query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted, + doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, reserve_size); + // Check if this is an invalid reserve, for example, if the reserve size is too large, larger than the query limit + // if hard limit is enabled, then not need enable other queries hard limit. + if (inserted) { + LOG(INFO) << "Insert one new paused query: " << query_ctx->debug_string() + << ", workload group: " << wg->debug_string(); + } +} + +/** + * Strategy 1: A revocable query should not have any running task(PipelineTask). + * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit + * strategy 3: If any query exceed process memlimit, then should clear all caches. + * strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it. + * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: + */ +void WorkloadGroupMgr::handle_paused_queries() { + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto& [wg_id, wg] : _workload_groups) { + std::unique_lock<std::mutex> lock(_paused_queries_lock); + if (_paused_queries_list[wg].empty()) { + // Add an empty set to wg that not contains paused queries. + } + } + } + + std::unique_lock<std::mutex> lock(_paused_queries_lock); + bool has_revoked_from_other_group = false; + bool has_query_exceed_process_memlimit = false; + for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { + auto& queries_list = it->second; + auto query_count = queries_list.size(); + const auto& wg = it->first; + + if (query_count != 0) { + LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << wg->name() << ": " + << query_count; + } + + bool has_changed_hard_limit = false; + int64_t flushed_memtable_bytes = 0; + // If the query is paused because its limit exceed the query itself's memlimit, then just spill disk. + // The query's memlimit is set using slot mechanism and its value is set using the user settings, not + // by weighted value. So if reserve failed, then it is actually exceed limit. + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + auto query_ctx = query_it->query_ctx_.lock(); + // The query is finished during in paused list. + if (query_ctx == nullptr) { + LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; + query_it = queries_list.erase(query_it); + continue; + } + if (query_ctx->is_cancelled()) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + + if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { + // Streamload, kafka load, group commit will never have query memory exceeded error because + // their query limit is very large. + bool spill_res = + handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id()) + << " remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + } else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + // Only deal with non overcommit workload group. + if (wg->enable_memory_overcommit()) { + // Soft limit wg will only reserve failed when process limit exceed. But in some corner case, + // when reserve, the wg is hard limit, the query reserve failed, but when this loop run + // the wg is converted to soft limit. + // So that should resume the query. + LOG(WARNING) + << "Query: " << print_id(query_ctx->query_id()) + << " reserve memory failed because exceed workload group memlimit, it " + "should not happen, resume it again. paused reason: " + << query_ctx->paused_reason(); + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + // check if the reserve is too large, if it is too large, + // should set the query's limit only. + // Check the query's reserve with expected limit. + if (query_ctx->adjusted_mem_limit() < + query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { + query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit()); + query_ctx->set_memory_sufficient(true); + LOG(INFO) << "Workload group memory reserve failed because " + << query_ctx->debug_string() << " reserve size " + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << " is too large, set hard limit to " + << PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit()) + << " and resume running."; + query_it = queries_list.erase(query_it); + continue; + } + if (flushed_memtable_bytes <= 0) { + flushed_memtable_bytes = + flush_memtable_from_current_group_(wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + wg->enable_write_buffer_limit(true); + ++query_it; + continue; + } + if (!has_changed_hard_limit) { + update_queries_limit_(wg, true); + has_changed_hard_limit = true; + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " reserve memory(" + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << ") failed due to workload group memory exceed, " + "should set the workload group work in memory insufficent mode, " + "so that other query will reduce their memory." + << " Query mem limit: " + << PrettyPrinter::print_bytes(query_ctx->get_mem_limit()) + << " mem usage: " + << PrettyPrinter::print_bytes( + query_ctx->get_mem_tracker()->consumption()) + << ", wg: " << wg->debug_string(); + } + if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) { Review Comment: 当多个query因为`WORKLOAD_GROUP_MEMORY_EXCEEDED`暂停且 `slot_memory_policy` 等于 None 时,逻辑似乎不合理 当 `slot_memory_policy` 不等于 None 时 第一次 `handle_paused_queries` 时,会将 `memory used + reserve > adjusted_mem_limit` 的 query 的 `adjusted_mem_limit` 更新为 `mem_limit` 后继续执行,其他 query 继续等待。 第二次 `handle_paused_queries` 时内存超限的 query 预期 会因为 `QUERY_MEMORY_EXCEEDED` 进入暂停状态,然后将 spill 或者cancel,等内存释放后其他 Query 继续执行。 当 `slot_memory_policy` 等于 None 时 第一次 `handle_paused_queries` 时,同样会将 `memory used + reserve > adjusted_mem_limit` 的 query 的 `adjusted_mem_limit` 更新为 `mem_limit` 后继续执行。其他 query 则会直接调用 `handle_single_query_` 去 spill 或 cancel。 不合理在于,`slot_memory_policy` 等于 None 时,内存超限的 query 继续执行(虽然预期会因为 `QUERY_MEMORY_EXCEEDED` 再次暂停),而没有内存超限的 query 则直接 spill 了。 ########## be/src/runtime/workload_group/workload_group_manager.cpp: ########## @@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() { } } +void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, + int64_t reserve_size, const Status& status) { + DCHECK(query_ctx != nullptr); + query_ctx->update_paused_reason(status); + query_ctx->set_low_memory_mode(); + query_ctx->set_memory_sufficient(false); + std::lock_guard<std::mutex> lock(_paused_queries_lock); + auto wg = query_ctx->workload_group(); + auto&& [it, inserted] = _paused_queries_list[wg].emplace( + query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted, + doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, reserve_size); + // Check if this is an invalid reserve, for example, if the reserve size is too large, larger than the query limit + // if hard limit is enabled, then not need enable other queries hard limit. + if (inserted) { + LOG(INFO) << "Insert one new paused query: " << query_ctx->debug_string() + << ", workload group: " << wg->debug_string(); + } +} + +/** + * Strategy 1: A revocable query should not have any running task(PipelineTask). + * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit + * strategy 3: If any query exceed process memlimit, then should clear all caches. + * strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it. + * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: + */ +void WorkloadGroupMgr::handle_paused_queries() { + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto& [wg_id, wg] : _workload_groups) { + std::unique_lock<std::mutex> lock(_paused_queries_lock); + if (_paused_queries_list[wg].empty()) { + // Add an empty set to wg that not contains paused queries. + } + } + } + + std::unique_lock<std::mutex> lock(_paused_queries_lock); + bool has_revoked_from_other_group = false; + bool has_query_exceed_process_memlimit = false; + for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { + auto& queries_list = it->second; + auto query_count = queries_list.size(); + const auto& wg = it->first; + + if (query_count != 0) { + LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << wg->name() << ": " + << query_count; + } + + bool has_changed_hard_limit = false; + int64_t flushed_memtable_bytes = 0; + // If the query is paused because its limit exceed the query itself's memlimit, then just spill disk. + // The query's memlimit is set using slot mechanism and its value is set using the user settings, not + // by weighted value. So if reserve failed, then it is actually exceed limit. + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + auto query_ctx = query_it->query_ctx_.lock(); + // The query is finished during in paused list. + if (query_ctx == nullptr) { + LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; + query_it = queries_list.erase(query_it); + continue; + } + if (query_ctx->is_cancelled()) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + + if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { + // Streamload, kafka load, group commit will never have query memory exceeded error because + // their query limit is very large. + bool spill_res = + handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id()) + << " remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + } else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + // Only deal with non overcommit workload group. + if (wg->enable_memory_overcommit()) { + // Soft limit wg will only reserve failed when process limit exceed. But in some corner case, + // when reserve, the wg is hard limit, the query reserve failed, but when this loop run + // the wg is converted to soft limit. + // So that should resume the query. + LOG(WARNING) + << "Query: " << print_id(query_ctx->query_id()) + << " reserve memory failed because exceed workload group memlimit, it " + "should not happen, resume it again. paused reason: " + << query_ctx->paused_reason(); + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + // check if the reserve is too large, if it is too large, + // should set the query's limit only. + // Check the query's reserve with expected limit. + if (query_ctx->adjusted_mem_limit() < + query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { + query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit()); + query_ctx->set_memory_sufficient(true); + LOG(INFO) << "Workload group memory reserve failed because " + << query_ctx->debug_string() << " reserve size " + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << " is too large, set hard limit to " + << PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit()) + << " and resume running."; + query_it = queries_list.erase(query_it); + continue; + } + if (flushed_memtable_bytes <= 0) { + flushed_memtable_bytes = + flush_memtable_from_current_group_(wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + wg->enable_write_buffer_limit(true); + ++query_it; + continue; + } + if (!has_changed_hard_limit) { + update_queries_limit_(wg, true); + has_changed_hard_limit = true; + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " reserve memory(" + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << ") failed due to workload group memory exceed, " + "should set the workload group work in memory insufficent mode, " + "so that other query will reduce their memory." + << " Query mem limit: " + << PrettyPrinter::print_bytes(query_ctx->get_mem_limit()) + << " mem usage: " + << PrettyPrinter::print_bytes( + query_ctx->get_mem_tracker()->consumption()) + << ", wg: " << wg->debug_string(); + } + if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) { + // If not enable slot memory policy, then should spill directly + // Maybe there are another query that use too much memory, but we + // not encourage not enable slot memory. + // TODO should kill the query that exceed limit. + bool spill_res = handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), + query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id()) + << " remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + } else { + // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, + // and then set wg's flag, other query may not free memory very quickly. + if (query_it->elapsed_time() > config::spill_in_paused_queue_timeout_ms) { + // set wg's memory to insufficent, then add it back to task scheduler to run. + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " will be resume."; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } else { + ++query_it; + continue; + } + } + } else { + has_query_exceed_process_memlimit = true; + // If wg's memlimit not exceed, but process memory exceed, it means cache or other metadata + // used too much memory. Should clean all cache here. + // 1. Check cache used, if cache is larger than > 0, then just return and wait for it to 0 to release some memory. + if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted > + 0.05 && + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted > + 0.05) { + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = + 0.04; + doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity(); + LOG(INFO) << "There are some queries need process memory, so that set cache " + "capacity " + "to 0 now"; + } + // need to check config::disable_memory_gc here, if not, when config::disable_memory_gc == true, + // cache is not adjusted, query_it->cache_ratio_ will always be 1, and this if branch will nenver + // execute, this query will never be resumed, and will deadlock here + if ((!config::disable_memory_gc && query_it->cache_ratio_ < 0.05) || + config::disable_memory_gc) { + // 1. Check if could revoke some memory from memtable + if (flushed_memtable_bytes <= 0) { + flushed_memtable_bytes = + flush_memtable_from_current_group_(wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + ++query_it; + continue; + } + // TODO should wait here to check if the process has release revoked_size memory and then continue. + if (!has_revoked_from_other_group) { + int64_t revoked_size = revoke_memory_from_other_group_( + query_ctx, wg->enable_memory_overcommit(), query_it->reserve_size_); + if (revoked_size > 0) { + has_revoked_from_other_group = true; + query_ctx->set_memory_sufficient(true); + VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id()) + << " is resumed after revoke memory from other group."; + query_it = queries_list.erase(query_it); + // Do not care if the revoked_size > reserve size, and try to run again. + continue; + } else { + bool spill_res = handle_single_query_( + query_ctx, query_it->reserve_size_, query_it->elapsed_time(), + query_ctx->paused_reason()); + if (spill_res) { + VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id()) + << " remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } else { + ++query_it; + continue; + } + } + } else { + // If any query is cancelled during process limit stage, should resume other query and + // do not do any check now. + query_ctx->set_memory_sufficient(true); + VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id()) + << " remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + } + if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted < + 0.05 && + query_it->cache_ratio_ > 0.05) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " will be resume after cache adjust."; + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + ++query_it; + } + } + + bool is_low_watermark = false; + bool is_high_watermark = false; + wg->check_mem_used(&is_low_watermark, &is_high_watermark); + // Not need waiting flush memtable and below low watermark disable load buffer limit + if (flushed_memtable_bytes <= 0 && !is_low_watermark) { + wg->enable_write_buffer_limit(false); + } + + if (queries_list.empty()) { + it = _paused_queries_list.erase(it); + continue; + } else { + // Finished deal with one workload group, and should deal with next one. + ++it; + } + } + + if (has_query_exceed_process_memlimit) { + // No query failed due to process exceed limit, so that enable cache now. + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 1; + } +} + +// Return the expected free bytes if memtable could flush +int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(WorkloadGroupPtr wg, + int64_t need_free_mem) { Review Comment: 参数 need_free_mem 没有用到 ########## be/src/runtime/workload_group/workload_group_manager.cpp: ########## @@ -287,6 +256,624 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() { } } +void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, + int64_t reserve_size, const Status& status) { + DCHECK(query_ctx != nullptr); + query_ctx->update_paused_reason(status); + query_ctx->set_low_memory_mode(); + query_ctx->set_memory_sufficient(false); + std::lock_guard<std::mutex> lock(_paused_queries_lock); + auto wg = query_ctx->workload_group(); + auto&& [it, inserted] = _paused_queries_list[wg].emplace( + query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted, + doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, reserve_size); + // Check if this is an invalid reserve, for example, if the reserve size is too large, larger than the query limit + // if hard limit is enabled, then not need enable other queries hard limit. + if (inserted) { + LOG(INFO) << "Insert one new paused query: " << query_ctx->debug_string() + << ", workload group: " << wg->debug_string(); + } +} + +/** + * Strategy 1: A revocable query should not have any running task(PipelineTask). + * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit + * strategy 3: If any query exceed process memlimit, then should clear all caches. + * strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it. + * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: + */ +void WorkloadGroupMgr::handle_paused_queries() { + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto& [wg_id, wg] : _workload_groups) { + std::unique_lock<std::mutex> lock(_paused_queries_lock); + if (_paused_queries_list[wg].empty()) { + // Add an empty set to wg that not contains paused queries. + } + } + } + + std::unique_lock<std::mutex> lock(_paused_queries_lock); + bool has_revoked_from_other_group = false; + bool has_query_exceed_process_memlimit = false; + for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { + auto& queries_list = it->second; + const auto& wg = it->first; + + LOG_EVERY_T(INFO, 120) << "Paused queries count: " << queries_list.size(); + + bool is_low_watermark = false; + bool is_high_watermark = false; + wg->check_mem_used(&is_low_watermark, &is_high_watermark); + + bool has_changed_hard_limit = false; + int64_t flushed_memtable_bytes = 0; + // If the query is paused because its limit exceed the query itself's memlimit, then just spill disk. + // The query's memlimit is set using slot mechanism and its value is set using the user settings, not + // by weighted value. So if reserve failed, then it is actually exceed limit. + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + auto query_ctx = query_it->query_ctx_.lock(); + // The query is finished during in paused list. + if (query_ctx == nullptr) { + LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; + query_it = queries_list.erase(query_it); + continue; + } + if (query_ctx->is_cancelled()) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + + if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { + // Streamload, kafka load, group commit will never have query memory exceeded error because + // their query limit is very large. + bool spill_res = + handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + query_it = queries_list.erase(query_it); + continue; + } + } else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + // Only deal with non overcommit workload group. + if (wg->enable_memory_overcommit()) { + // Soft limit wg will only reserve failed when process limit exceed. But in some corner case, + // when reserve, the wg is hard limit, the query reserve failed, but when this loop run + // the wg is converted to soft limit. + // So that should resume the query. + LOG(WARNING) + << "Query: " << print_id(query_ctx->query_id()) + << " reserve memory failed because exceed workload group memlimit, it " + "should not happen, resume it again. paused reason: " + << query_ctx->paused_reason(); + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + // check if the reserve is too large, if it is too large, + // should set the query's limit only. + // Check the query's reserve with expected limit. + if (query_ctx->adjusted_mem_limit() < + query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { Review Comment: 我知道了,修改 query mem limit 后,query再次进入暂停状态的原因预期是 `QUERY_MEMORY_EXCEEDED`,将调用 `handle_single_query_` 去 spill。 ########## be/src/runtime/workload_group/workload_group_manager.cpp: ########## @@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() { } } +void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, + int64_t reserve_size, const Status& status) { + DCHECK(query_ctx != nullptr); + query_ctx->update_paused_reason(status); + query_ctx->set_low_memory_mode(); + query_ctx->set_memory_sufficient(false); + std::lock_guard<std::mutex> lock(_paused_queries_lock); + auto wg = query_ctx->workload_group(); + auto&& [it, inserted] = _paused_queries_list[wg].emplace( + query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted, + doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, reserve_size); + // Check if this is an invalid reserve, for example, if the reserve size is too large, larger than the query limit + // if hard limit is enabled, then not need enable other queries hard limit. + if (inserted) { + LOG(INFO) << "Insert one new paused query: " << query_ctx->debug_string() + << ", workload group: " << wg->debug_string(); + } +} + +/** + * Strategy 1: A revocable query should not have any running task(PipelineTask). + * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit + * strategy 3: If any query exceed process memlimit, then should clear all caches. + * strategy 4: If any query exceed query's memlimit, then do spill disk or cancel it. + * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: + */ +void WorkloadGroupMgr::handle_paused_queries() { + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto& [wg_id, wg] : _workload_groups) { + std::unique_lock<std::mutex> lock(_paused_queries_lock); + if (_paused_queries_list[wg].empty()) { + // Add an empty set to wg that not contains paused queries. + } + } + } + + std::unique_lock<std::mutex> lock(_paused_queries_lock); + bool has_revoked_from_other_group = false; + bool has_query_exceed_process_memlimit = false; + for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { + auto& queries_list = it->second; + auto query_count = queries_list.size(); + const auto& wg = it->first; + + if (query_count != 0) { + LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << wg->name() << ": " + << query_count; + } + + bool has_changed_hard_limit = false; + int64_t flushed_memtable_bytes = 0; + // If the query is paused because its limit exceed the query itself's memlimit, then just spill disk. + // The query's memlimit is set using slot mechanism and its value is set using the user settings, not + // by weighted value. So if reserve failed, then it is actually exceed limit. + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { + auto query_ctx = query_it->query_ctx_.lock(); + // The query is finished during in paused list. + if (query_ctx == nullptr) { + LOG(INFO) << "Query: " << query_it->query_id() << " is nullptr, erase it."; + query_it = queries_list.erase(query_it); + continue; + } + if (query_ctx->is_cancelled()) { + LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) + << " was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + + if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { + // Streamload, kafka load, group commit will never have query memory exceeded error because + // their query limit is very large. + bool spill_res = + handle_single_query_(query_ctx, query_it->reserve_size_, + query_it->elapsed_time(), query_ctx->paused_reason()); + if (!spill_res) { + ++query_it; + continue; + } else { + VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id()) + << " remove from paused list"; + query_it = queries_list.erase(query_it); + continue; + } + } else if (query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { + // Only deal with non overcommit workload group. + if (wg->enable_memory_overcommit()) { + // Soft limit wg will only reserve failed when process limit exceed. But in some corner case, + // when reserve, the wg is hard limit, the query reserve failed, but when this loop run + // the wg is converted to soft limit. + // So that should resume the query. + LOG(WARNING) + << "Query: " << print_id(query_ctx->query_id()) + << " reserve memory failed because exceed workload group memlimit, it " + "should not happen, resume it again. paused reason: " + << query_ctx->paused_reason(); + query_ctx->set_memory_sufficient(true); + query_it = queries_list.erase(query_it); + continue; + } + // check if the reserve is too large, if it is too large, + // should set the query's limit only. + // Check the query's reserve with expected limit. + if (query_ctx->adjusted_mem_limit() < + query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { + query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit()); + query_ctx->set_memory_sufficient(true); + LOG(INFO) << "Workload group memory reserve failed because " + << query_ctx->debug_string() << " reserve size " + << PrettyPrinter::print_bytes(query_it->reserve_size_) + << " is too large, set hard limit to " + << PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit()) + << " and resume running."; + query_it = queries_list.erase(query_it); + continue; + } + if (flushed_memtable_bytes <= 0) { + flushed_memtable_bytes = + flush_memtable_from_current_group_(wg, query_it->reserve_size_); + } + if (flushed_memtable_bytes > 0) { + // Flushed some memtable, just wait flush finished and not do anything more. + wg->enable_write_buffer_limit(true); + ++query_it; + continue; + } + if (!has_changed_hard_limit) { + update_queries_limit_(wg, true); Review Comment: 这个 `update_queries_limit_` 有必要么,每20ms调用一次 `refresh_wg_weighted_memory_limit` 就会 update。 就算这里要 update,也应该挪到下面这段代码的上面: ``` if (query_ctx->adjusted_mem_limit() < query_ctx->get_mem_tracker()->consumption() + query_it->reserve_size_) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org