This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new ffcf519d841 memtable flush reserve process mem and improve logs (#45743) ffcf519d841 is described below commit ffcf519d841d295285cc5caef15b47a15acb7d20 Author: TengJianPing <tengjianp...@selectdb.com> AuthorDate: Tue Dec 24 16:29:15 2024 +0800 memtable flush reserve process mem and improve logs (#45743) --- be/src/olap/memtable.cpp | 14 +++ be/src/olap/memtable.h | 1 + be/src/olap/memtable_flush_executor.cpp | 40 ++++++++ be/src/olap/memtable_flush_executor.h | 19 ++++ be/src/olap/memtable_memory_limiter.cpp | 28 ++++-- be/src/olap/memtable_memory_limiter.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 5 +- be/src/pipeline/pipeline_task.cpp | 10 +- be/src/pipeline/pipeline_task.h | 2 + be/src/runtime/memory/global_memory_arbitrator.h | 14 +-- be/src/runtime/memory/mem_tracker_limiter.cpp | 3 +- be/src/runtime/memory/memory_profile.cpp | 13 ++- be/src/runtime/memory/memory_profile.h | 1 + be/src/runtime/memory/thread_mem_tracker_mgr.h | 48 ++++++---- be/src/runtime/runtime_state.h | 14 --- be/src/runtime/thread_context.h | 11 ++- be/src/runtime/workload_group/workload_group.cpp | 20 ++-- be/src/runtime/workload_group/workload_group.h | 6 +- .../workload_group/workload_group_manager.cpp | 101 ++++++++++++--------- .../workload_group/workload_group_manager.h | 3 +- be/src/vec/exec/scan/scanner_context.h | 2 + be/src/vec/exec/scan/scanner_scheduler.cpp | 69 ++++++++++---- be/src/vec/exec/scan/vscanner.h | 11 ++- .../java/org/apache/doris/qe/SessionVariable.java | 15 --- gensrc/thrift/PaloInternalService.thrift | 1 + 25 files changed, 303 insertions(+), 150 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 995402bc273..bf7c5d53d25 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -611,6 +611,20 @@ bool MemTable::need_agg() const { return false; } +size_t MemTable::get_flush_reserve_memory_size() const { + size_t reserve_size = 0; + if (_keys_type == KeysType::DUP_KEYS) { + if (_tablet_schema->num_key_columns() == 0) { + // no need to reserve + } else { + reserve_size = _input_mutable_block.allocated_bytes(); + } + } else { + reserve_size = _input_mutable_block.allocated_bytes(); + } + return reserve_size; +} + Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) { size_t same_keys_num = _sort(); if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) { diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 77ff2e886bf..09591df2745 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -181,6 +181,7 @@ public: int64_t tablet_id() const { return _tablet_id; } size_t memory_usage() const { return _mem_tracker->consumption(); } + size_t get_flush_reserve_memory_size() const; // insert tuple from (row_pos) to (row_pos+num_rows) Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs); diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 50ccbb25958..5533a360fac 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -28,6 +28,7 @@ #include "common/signal_handler.h" #include "olap/memtable.h" #include "olap/rowset/rowset_writer.h" +#include "olap/storage_engine.h" #include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/metrics.h" @@ -140,6 +141,36 @@ Status FlushToken::wait() { return Status::OK(); } +Status FlushToken::_try_reserve_memory(QueryThreadContext query_thread_context, int64_t size) { + auto* thread_context = doris::thread_context(); + auto* memtable_flush_executor = + ExecEnv::GetInstance()->storage_engine().memtable_flush_executor(); + Status st; + do { + // only try to reserve process memory + st = thread_context->try_reserve_process_memory(size); + if (st.ok()) { + memtable_flush_executor->inc_flushing_task(); + break; + } + if (_is_shutdown() || query_thread_context.get_memory_tracker()->is_query_cancelled()) { + st = Status::Cancelled("flush memtable already cancelled"); + break; + } + // Make sure at least one memtable is flushing even reserve memory failed. + if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) { + // If there are already any flushing task, Wait for some time and retry. + LOG_EVERY_T(INFO, 60) << fmt::format( + "Failed to reserve memory {} for flush memtable, retry after 100ms", size); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } else { + st = Status::OK(); + break; + } + } while (true); + return st; +} + Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) { VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id() << ", memsize: " << memtable->memory_usage() @@ -150,10 +181,19 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in SCOPED_ATTACH_TASK(memtable->query_thread_context()); signal::set_signal_task_id(_rowset_writer->load_id()); signal::tablet_id = memtable->tablet_id(); + + DEFER_RELEASE_RESERVED(); + + auto reserve_size = memtable->get_flush_reserve_memory_size(); + RETURN_IF_ERROR(_try_reserve_memory(memtable->query_thread_context(), reserve_size)); { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( memtable->query_thread_context().query_mem_tracker->write_tracker()); SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker()); + + Defer defer {[&]() { + ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task(); + }}; std::unique_ptr<vectorized::Block> block; RETURN_IF_ERROR(memtable->to_block(&block)); RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size)); diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 27e8e8a9b0e..040a8fa5449 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -94,6 +94,8 @@ private: Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size); + Status _try_reserve_memory(QueryThreadContext query_thread_context, int64_t size); + // Records the current flush status of the tablet. // Note: Once its value is set to Failed, it cannot return to SUCCESS. std::shared_mutex _flush_status_lock; @@ -140,12 +142,29 @@ public: std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority, std::shared_ptr<WorkloadGroup> wg_sptr); + // return true if it already has any flushing task + bool check_and_inc_has_any_flushing_task() { + // need to use CAS instead of only `if (0 == _flushing_task_count)` statement, + // to avoid concurrent entries both pass the if statement + int expected_count = 0; + if (!_flushing_task_count.compare_exchange_strong(expected_count, 1)) { + return true; + } + DCHECK(expected_count == 0 && _flushing_task_count == 1); + return false; + } + + void inc_flushing_task() { _flushing_task_count++; } + + void dec_flushing_task() { _flushing_task_count--; } + private: void _register_metrics(); static void _deregister_metrics(); std::unique_ptr<ThreadPool> _flush_pool; std::unique_ptr<ThreadPool> _high_prio_flush_pool; + std::atomic<int> _flushing_task_count = 0; }; } // namespace doris diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index b222c041b34..22b842ec672 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -124,10 +124,10 @@ void MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPt --sleep_times; } // Check process memory again. - handle_memtable_flush(); + handle_memtable_flush(wg); } -void MemTableMemoryLimiter::handle_memtable_flush() { +void MemTableMemoryLimiter::handle_memtable_flush(WorkloadGroupPtr wg) { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); if (!_soft_limit_reached() || _load_usage_low()) { @@ -150,12 +150,17 @@ void MemTableMemoryLimiter::handle_memtable_flush() { if (need_flush > 0) { auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT; LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft") - << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", " + << GlobalMemoryArbitrator::sys_mem_available_details_str() << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << ", memtable writers num: " << _writers.size() << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) - << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage); + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) + << ", wg: " << (wg ? wg->debug_string() : "null\n") + << doris::ProcessProfile::instance() + ->memory_profile() + ->process_memory_detail_str(); _flush_active_memtables(0, need_flush); } } while (_hard_limit_reached() && !_load_usage_low()); @@ -163,7 +168,16 @@ void MemTableMemoryLimiter::handle_memtable_flush() { timer.stop(); int64_t time_ms = timer.elapsed_time() / 1000 / 1000; g_memtable_memory_limit_latency_ms << time_ms; - LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit"; + LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit" + << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", " + << GlobalMemoryArbitrator::sys_mem_available_details_str() + << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", memtable writers num: " << _writers.size() + << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) + << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) + << ", wg: " << (wg ? wg->debug_string() : "null.\n") + << doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str(); } int64_t MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush) { @@ -270,11 +284,13 @@ void MemTableMemoryLimiter::refresh_mem_tracker() { _last_limit = limit; _log_timer.reset(); LOG(INFO) << ss.str() << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() + << ", " << GlobalMemoryArbitrator::sys_mem_available_details_str() << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) << ", memtable writers num: " << _writers.size() << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) - << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage); + << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) << "\n" + << doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str(); } void MemTableMemoryLimiter::_refresh_mem_tracker() { diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index de2fb802165..155a1dd424b 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -43,7 +43,7 @@ public: // If yes, it will flush memtable to try to reduce memory consumption. // Every write operation will call this API to check if need flush memtable OR hang // when memory is not available. - void handle_memtable_flush(); + void handle_memtable_flush(WorkloadGroupPtr wg); int64_t flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush_bytes); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 42d2640441e..f86fb491d71 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1821,8 +1821,9 @@ size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const if (task->is_running() || task->is_revoking()) { LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id) << " is running, task: " << (void*)task.get() - << ", task->is_revoking(): " << task->is_revoking() << ", " - << task->is_running(); + << ", is_revoking: " << task->is_revoking() + << ", is_running: " << task->is_running() + << ", task info: " << task->debug_string(); *has_running_task = true; return 0; } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 0ddc329da3b..3fef0fc4c93 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -437,11 +437,8 @@ Status PipelineTask::execute(bool* eos) { } LOG(INFO) << debug_msg; - _state->get_query_ctx()->update_paused_reason(st); - _state->get_query_ctx()->set_low_memory_mode(); - _state->get_query_ctx()->set_memory_sufficient(false); ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( - _state->get_query_ctx()->shared_from_this(), reserve_size); + _state->get_query_ctx()->shared_from_this(), reserve_size, st); continue; } } @@ -484,11 +481,8 @@ Status PipelineTask::execute(bool* eos) { DCHECK_EQ(_pending_block.get(), nullptr); _pending_block = std::move(_block); _block = vectorized::Block::create_unique(_pending_block->clone_empty()); - _state->get_query_ctx()->update_paused_reason(status); - _state->get_query_ctx()->set_low_memory_mode(); - _state->get_query_ctx()->set_memory_sufficient(false); ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( - _state->get_query_ctx()->shared_from_this(), sink_reserve_size); + _state->get_query_ctx()->shared_from_this(), sink_reserve_size, status); _pending_eos = *eos; *eos = false; continue; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 99decc05a9d..19315bef89d 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -251,6 +251,8 @@ public: return _memory_sufficient_dependency.get(); } + void inc_memory_reserve_failed_times() { COUNTER_UPDATE(_memory_reserve_failed_times, 1); } + private: friend class RuntimeFilterDependency; bool _is_blocked(); diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h index 05963132cb1..790301b68f9 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.h +++ b/be/src/runtime/memory/global_memory_arbitrator.h @@ -91,12 +91,13 @@ public: static inline std::string sys_mem_available_details_str() { auto msg = fmt::format( "sys available memory {}(= {}[proc/available] - {}[reserved] - " - "{}B[waiting_refresh])", + "{}B[waiting_refresh] + {}[tc/jemalloc_cache])", PrettyPrinter::print(sys_mem_available(), TUnit::BYTES), PrettyPrinter::print(MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed), TUnit::BYTES), PrettyPrinter::print(process_reserved_memory(), TUnit::BYTES), - refresh_interval_memory_growth); + refresh_interval_memory_growth, + PrettyPrinter::print_bytes(static_cast<uint64_t>(MemInfo::allocator_cache_mem()))); #ifdef ADDRESS_SANITIZER msg = "[ASAN]" + msg; #endif @@ -165,15 +166,16 @@ public: static std::string process_limit_exceeded_errmsg_str() { return fmt::format( - "{} exceed limit {} or {} less than low water mark {}", process_memory_used_str(), - MemInfo::mem_limit_str(), sys_mem_available_str(), + "{} exceed limit {} or {} less than low water mark {}", + process_memory_used_details_str(), MemInfo::mem_limit_str(), + sys_mem_available_details_str(), PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES)); } static std::string process_soft_limit_exceeded_errmsg_str() { return fmt::format("{} exceed soft limit {} or {} less than warning water mark {}.", - process_memory_used_str(), MemInfo::soft_mem_limit_str(), - sys_mem_available_str(), + process_memory_used_details_str(), MemInfo::soft_mem_limit_str(), + sys_mem_available_details_str(), PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), TUnit::BYTES)); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 068c3427b84..bd9aa705a33 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -358,7 +358,8 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str() { "{}, peak used {}, current used {}. backend {}, {}.", label(), type_string(_type), MemCounter::print_bytes(limit()), MemCounter::print_bytes(peak_consumption()), MemCounter::print_bytes(consumption()), - BackendOptions::get_localhost(), GlobalMemoryArbitrator::process_memory_used_str()); + BackendOptions::get_localhost(), + GlobalMemoryArbitrator::process_memory_used_details_str()); if (_type == Type::QUERY || _type == Type::LOAD) { err_msg += fmt::format( " exec node:<{}>, can `set exec_mem_limit=8G` to change limit, details see " diff --git a/be/src/runtime/memory/memory_profile.cpp b/be/src/runtime/memory/memory_profile.cpp index c7421236c42..fa8b45abfb4 100644 --- a/be/src/runtime/memory/memory_profile.cpp +++ b/be/src/runtime/memory/memory_profile.cpp @@ -340,14 +340,17 @@ int64_t MemoryProfile::other_current_usage() { return memory_other_trackers_sum_bytes.get_value(); } +std::string MemoryProfile::process_memory_detail_str() const { + return fmt::format("Process Memory Summary: {}\n, {}\n, {}\n, {}", + GlobalMemoryArbitrator::process_mem_log_str(), + print_memory_overview_profile(), print_global_memory_profile(), + print_top_memory_tasks_profile()); +} + void MemoryProfile::print_log_process_usage() { if (_enable_print_log_process_usage) { _enable_print_log_process_usage = false; - LOG(WARNING) << "Process Memory Summary: " + GlobalMemoryArbitrator::process_mem_log_str() - << "\n" - << print_memory_overview_profile() << "\n" - << print_global_memory_profile() << "\n" - << print_top_memory_tasks_profile(); + LOG(WARNING) << process_memory_detail_str(); } } diff --git a/be/src/runtime/memory/memory_profile.h b/be/src/runtime/memory/memory_profile.h index 9f1bab0c02a..bf3a6aa9f39 100644 --- a/be/src/runtime/memory/memory_profile.h +++ b/be/src/runtime/memory/memory_profile.h @@ -69,6 +69,7 @@ public: // process memory changes more than 256M, or the GC ends void enable_print_log_process_usage() { _enable_print_log_process_usage = true; } void print_log_process_usage(); + std::string process_memory_detail_str() const; private: MultiVersion<RuntimeProfile> _memory_overview_profile; diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 83caf753aed..8193d89394a 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -83,7 +83,7 @@ public: void consume(int64_t size, int skip_large_memory_check = 0); void flush_untracked_mem(); - doris::Status try_reserve(int64_t size); + doris::Status try_reserve(int64_t size, bool only_check_process_memory); void release_reserved(); @@ -278,7 +278,8 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { _stop_consume = false; } -inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { +inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size, + bool only_check_process_memory) { DCHECK(_limiter_tracker); DCHECK(size >= 0); CHECK(init()); @@ -286,26 +287,35 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { // _untracked_mem store bytes that not synchronized to process reserved memory. flush_untracked_mem(); auto wg_ptr = _wg_wptr.lock(); - if (!_limiter_tracker->try_reserve(size)) { - auto err_msg = fmt::format( - "reserve memory failed, size: {}, because query memory exceeded, memory tracker " - "consumption: {}, limit: {}", - PrettyPrinter::print(size, TUnit::BYTES), - PrettyPrinter::print(_limiter_tracker->consumption(), TUnit::BYTES), - PrettyPrinter::print(_limiter_tracker->limit(), TUnit::BYTES)); - return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg); - } - if (wg_ptr) { - if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) { + if (only_check_process_memory) { + _limiter_tracker->reserve(size); + if (wg_ptr) { + wg_ptr->add_wg_refresh_interval_memory_growth(size); + } + } else { + if (!_limiter_tracker->try_reserve(size)) { auto err_msg = fmt::format( - "reserve memory failed, size: {}, because workload group memory exceeded, " - "workload group: {}", - PrettyPrinter::print(size, TUnit::BYTES), wg_ptr->memory_debug_string()); - _limiter_tracker->release(size); // rollback - _limiter_tracker->release_reserved(size); // rollback - return doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg); + "reserve memory failed, size: {}, because query memory exceeded, memory " + "tracker " + "consumption: {}, limit: {}", + PrettyPrinter::print(size, TUnit::BYTES), + PrettyPrinter::print(_limiter_tracker->consumption(), TUnit::BYTES), + PrettyPrinter::print(_limiter_tracker->limit(), TUnit::BYTES)); + return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg); + } + if (wg_ptr) { + if (!wg_ptr->try_add_wg_refresh_interval_memory_growth(size)) { + auto err_msg = fmt::format( + "reserve memory failed, size: {}, because workload group memory exceeded, " + "workload group: {}", + PrettyPrinter::print(size, TUnit::BYTES), wg_ptr->memory_debug_string()); + _limiter_tracker->release(size); // rollback + _limiter_tracker->release_reserved(size); // rollback + return doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg); + } } } + if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) { auto err_msg = fmt::format("reserve memory failed, size: {}, because proccess memory exceeded, {}", diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 16f500b2fcc..879bd647d96 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -410,20 +410,6 @@ public: bool enable_page_cache() const; - int partitioned_hash_join_rows_threshold() const { - if (!_query_options.__isset.partitioned_hash_join_rows_threshold) { - return 0; - } - return _query_options.partitioned_hash_join_rows_threshold; - } - - int partitioned_hash_agg_rows_threshold() const { - if (!_query_options.__isset.partitioned_hash_agg_rows_threshold) { - return 0; - } - return _query_options.partitioned_hash_agg_rows_threshold; - } - const std::vector<TTabletCommitInfo>& tablet_commit_infos() const { return _tablet_commit_infos; } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index a48bc680925..a9aede24487 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -249,13 +249,22 @@ public: thread_mem_tracker_mgr->consume(size, skip_large_memory_check); } + doris::Status try_reserve_process_memory(const int64_t size) const { +#ifdef USE_MEM_TRACKER + DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || + thread_mem_tracker()->label() != "Orphan") + << doris::memory_orphan_check_msg; +#endif + return thread_mem_tracker_mgr->try_reserve(size, true); + } + doris::Status try_reserve_memory(const int64_t size) const { #ifdef USE_MEM_TRACKER DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || thread_mem_tracker()->label() != "Orphan") << doris::memory_orphan_check_msg; #endif - return thread_mem_tracker_mgr->try_reserve(size); + return thread_mem_tracker_mgr->try_reserve(size, false); } void release_reserved_memory() const { diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 7e9a9812956..c87a927a455 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -118,7 +118,7 @@ std::string WorkloadGroup::debug_string() const { _remote_scan_bytes_per_second); } -bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) { +bool WorkloadGroup::try_add_wg_refresh_interval_memory_growth(int64_t size) { auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load() + size; if ((realtime_total_mem_used > @@ -137,15 +137,19 @@ std::string WorkloadGroup::memory_debug_string() const { auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load(); auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit + 1); return fmt::format( - "WorkloadGroup[id = {}, name = {}, memory_limit = {}, enable_memory_overcommit = {}, " - "total_mem_used = {}, wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, " - "memory_low_watermark = {}, memory_high_watermark = {}, version = {}, is_shutdown = " - "{}, query_num = {}]", - _id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES), - _enable_memory_overcommit ? "true" : "false", + "WorkloadGroup[id = {}, name = {}, version = {}," + "total_query_slot_count = {}, " + "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= {}%, " + "enable_memory_overcommit = {}, total_mem_used = {} (write_buffer_size={})," + "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, " + "memory_low_watermark={}, memory_high_watermark={}, is_shutdown={}, query_num={}]", + _id, _name, _version, _total_query_slot_count, + PrettyPrinter::print(_memory_limit, TUnit::BYTES), to_string(_slot_mem_policy), + _load_buffer_ratio, _enable_memory_overcommit ? "true" : "false", PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES), + PrettyPrinter::print(_write_buffer_size.load(), TUnit::BYTES), PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), TUnit::BYTES), - mem_used_ratio, _memory_low_watermark, _memory_high_watermark, _version, _is_shutdown, + mem_used_ratio, _memory_low_watermark, _memory_high_watermark, _is_shutdown, _query_ctxs.size()); } diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 1d617b22bfe..73fc4c965b8 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -113,7 +113,11 @@ public: return _total_query_slot_count.load(std::memory_order_relaxed); } - bool add_wg_refresh_interval_memory_growth(int64_t size); + void add_wg_refresh_interval_memory_growth(int64_t size) { + _wg_refresh_interval_memory_growth.fetch_add(size); + } + + bool try_add_wg_refresh_interval_memory_growth(int64_t size); void sub_wg_refresh_interval_memory_growth(int64_t size) { _wg_refresh_interval_memory_growth.fetch_sub(size); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index ade4f228850..dd3d7d970dd 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -271,9 +271,12 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { } void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, - int64_t reserve_size) { - std::lock_guard<std::mutex> lock(_paused_queries_lock); + int64_t reserve_size, const Status& status) { DCHECK(query_ctx != nullptr); + query_ctx->update_paused_reason(status); + query_ctx->set_low_memory_mode(); + query_ctx->set_memory_sufficient(false); + std::lock_guard<std::mutex> lock(_paused_queries_lock); auto wg = query_ctx->workload_group(); auto&& [it, inserted] = _paused_queries_list[wg].emplace( query_ctx, doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted, @@ -281,7 +284,6 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& que // Check if this is an invalid reserve, for example, if the reserve size is too large, larger than the query limit // if hard limit is enabled, then not need enable other queries hard limit. if (inserted) { - query_ctx->set_memory_sufficient(false); LOG(INFO) << "Insert one new paused query: " << query_ctx->debug_string() << ", workload group: " << wg->debug_string(); } @@ -399,8 +401,13 @@ void WorkloadGroupMgr::handle_paused_queries() { << PrettyPrinter::print_bytes(query_it->reserve_size_) << ") failed due to workload group memory exceed, " "should set the workload group work in memory insufficent mode, " - "so that other query will reduce their memory. wg: " - << wg->debug_string(); + "so that other query will reduce their memory." + << " Query mem limit: " + << PrettyPrinter::print_bytes(query_ctx->get_mem_limit()) + << " mem usage: " + << PrettyPrinter::print_bytes( + query_ctx->get_mem_tracker()->consumption()) + << ", wg: " << wg->debug_string(); } if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) { // If not enable slot memory policy, then should spill directly @@ -657,11 +664,12 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& return false; } + const auto wg = query_ctx->workload_group(); auto revocable_tasks = query_ctx->get_revocable_tasks(); if (revocable_tasks.empty()) { + const auto limit = query_ctx->get_mem_limit(); + const auto reserved_size = query_ctx->query_mem_tracker->reserved_consumption(); if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { - const auto limit = query_ctx->get_mem_limit(); - const auto reserved_size = query_ctx->query_mem_tracker->reserved_consumption(); // During waiting time, another operator in the query may finished and release // many memory and we could run. if ((memory_usage + size_to_reserve) < limit) { @@ -674,43 +682,44 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& } else if (time_in_queue >= config::spill_in_paused_queue_timeout_ms) { // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic auto msg1 = fmt::format( - "Query {} reserve memory failed, but could not find memory that could " - "release or spill to disk. Query memory usage: {}, reserved size: {}, try " - "to reserve: {} " - ", limit: {} ,process memory info: {}, wg info: {}.", + "Query {} failed beause query limit is exceeded, but could " + "not find memory that could release or spill to disk. Query memory usage: " + "{}, limit: {}, reserved " + "size: {}, try to reserve: {}, wg info: {}.", query_id, PrettyPrinter::print_bytes(memory_usage), + PrettyPrinter::print_bytes(limit), PrettyPrinter::print_bytes(reserved_size), - PrettyPrinter::print_bytes(size_to_reserve), - PrettyPrinter::print_bytes(query_ctx->get_mem_limit()), - GlobalMemoryArbitrator::process_memory_used_details_str(), - query_ctx->workload_group()->memory_debug_string()); - auto msg2 = msg1 + fmt::format( - " Query Memory Tracker Summary: {}." - " Load Memory Tracker Summary: {}", - MemTrackerLimiter::make_type_trackers_profile_str( - MemTrackerLimiter::Type::QUERY), - MemTrackerLimiter::make_type_trackers_profile_str( - MemTrackerLimiter::Type::LOAD)); - LOG(INFO) << msg2; + PrettyPrinter::print_bytes(size_to_reserve), wg->memory_debug_string()); + LOG(INFO) << fmt::format("{}.\n{}", msg1, + doris::ProcessProfile::instance() + ->memory_profile() + ->process_memory_detail_str()); query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1)); } else { return false; } } else if (paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { - if (!query_ctx->workload_group()->exceed_limit()) { + if (!wg->exceed_limit()) { LOG(INFO) << "Query: " << query_id << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it."; query_ctx->set_memory_sufficient(true); return true; } else if (time_in_queue > config::spill_in_paused_queue_timeout_ms) { - LOG(INFO) << "Query: " << query_id << ", workload group exceeded, info: " - << GlobalMemoryArbitrator::process_memory_used_details_str() - << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); - query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( - "The query({}) reserved memory failed because workload group limit " - "exceeded, and there is no cache now. And could not find task to spill. " - "Maybe you should set the workload group's limit to a lower value.", - query_id)); + auto msg1 = fmt::format( + "Query {} failed because workload group memory is exceeded" + ", and there is no cache now. And could not find task to spill. " + "Query memory usage: {}, limit: {}, reserved " + "size: {}, try to reserve: {}, wg info: {}." + " Maybe you should set the workload group's limit to a lower value.", + query_id, PrettyPrinter::print_bytes(memory_usage), + PrettyPrinter::print_bytes(limit), + PrettyPrinter::print_bytes(reserved_size), + PrettyPrinter::print_bytes(size_to_reserve), wg->memory_debug_string()); + LOG(INFO) << fmt::format("{}.\n{}", msg1, + doris::ProcessProfile::instance() + ->memory_profile() + ->process_memory_detail_str()); + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1)); } else { return false; } @@ -724,21 +733,25 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& << ", process limit not exceeded now, resume this query" << ", process memory info: " << GlobalMemoryArbitrator::process_memory_used_details_str() - << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); + << ", wg info: " << wg->debug_string(); query_ctx->set_memory_sufficient(true); return true; } else if (time_in_queue > config::spill_in_paused_queue_timeout_ms) { - LOG(INFO) << "Query: " << query_id << ", process limit exceeded, info: " - << GlobalMemoryArbitrator::process_memory_used_details_str() - << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); - query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( - "The query({}) reserved memory failed because process limit exceeded, " - "and " - "there is no cache now. And could not find task to spill. Maybe you " - "should " - "set " - "the workload group's limit to a lower value.", - query_id)); + auto msg1 = fmt::format( + "Query {} failed because process memory is exceeded" + ", and there is no cache now. And could not find task to spill. " + "Query memory usage: {}, limit: {}, reserved " + "size: {}, try to reserve: {}, wg info: {}." + " Maybe you should set the workload group's limit to a lower value.", + query_id, PrettyPrinter::print_bytes(memory_usage), + PrettyPrinter::print_bytes(limit), + PrettyPrinter::print_bytes(reserved_size), + PrettyPrinter::print_bytes(size_to_reserve), wg->memory_debug_string()); + LOG(INFO) << fmt::format("{}.\n{}", msg1, + doris::ProcessProfile::instance() + ->memory_profile() + ->process_memory_detail_str()); + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1)); } else { return false; } diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index fc53bfea858..47ea5540d3f 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -105,7 +105,8 @@ public: return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID]; } - void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, int64_t reserve_size); + void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, int64_t reserve_size, + const Status& status); void handle_paused_queries(); diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 2e1a4e9a343..ed5224289bd 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -170,6 +170,8 @@ public: int32_t low_memory_mode_scanners() const { return 4; } + pipeline::ScanLocalStateBase* local_state() const { return _local_state; } + // the unique id of this context std::string ctx_id; TUniqueId _query_id; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index a7bf8600663..d8858dd5aba 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -33,10 +33,12 @@ #include "common/logging.h" #include "common/status.h" #include "olap/tablet.h" +#include "pipeline/pipeline_task.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" +#include "runtime/workload_group/workload_group_manager.h" #include "util/async_io.h" // IWYU pragma: keep #include "util/blocking_queue.hpp" #include "util/cpu_info.h" @@ -209,6 +211,32 @@ std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token( return _limited_scan_thread_pool->new_token(mode, max_concurrency); } +void handle_reserve_memory_failure(RuntimeState* state, std::shared_ptr<ScannerContext> ctx, + const Status& st, size_t reserve_size) { + ctx->clear_free_blocks(); + auto* pipeline_task = state->get_task(); + auto* local_state = ctx->local_state(); + + pipeline_task->inc_memory_reserve_failed_times(); + auto debug_msg = fmt::format( + "Query: {} , scanner try to reserve: {}, operator name {}, " + "operator " + "id: {}, " + "task id: " + "{}, revocable mem size: {}, failed: {}", + print_id(state->query_id()), PrettyPrinter::print_bytes(reserve_size), + local_state->get_name(), local_state->parent()->node_id(), state->task_id(), + PrettyPrinter::print_bytes(pipeline_task->get_revocable_size()), st.to_string()); + // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str + if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) { + debug_msg += fmt::format(", debug info: {}", GlobalMemoryArbitrator::process_mem_log_str()); + } + LOG(INFO) << debug_msg; + + ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( + state->get_query_ctx()->shared_from_this(), reserve_size, st); +} + void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task) { auto task_lock = ctx->task_exec_ctx(); @@ -246,6 +274,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, bool eos = false; ASSIGN_STATUS_IF_CATCH_EXCEPTION( RuntimeState* state = ctx->state(); DCHECK(nullptr != state); + // scanner->open may alloc plenty amount of memory(read blocks of data), + // so better to also check low memory and clear free blocks here. + if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); } + if (!scanner->is_init()) { status = scanner->init(); if (!status.ok()) { @@ -268,16 +300,17 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, } size_t raw_bytes_threshold = config::doris_scanner_row_bytes; - if (ctx->low_memory_mode() && - raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) { - raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner(); + if (ctx->low_memory_mode()) { + ctx->clear_free_blocks(); + if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) { + raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner(); + } } size_t raw_bytes_read = 0; bool first_read = true; // If the first block is full, then it is true. Or the first block + second block > batch_size bool has_first_full_block = false; - size_t block_avg_bytes = ctx->batch_size(); // During low memory mode, every scan task will return at most 2 block to reduce memory usage. while (!eos && raw_bytes_read < raw_bytes_threshold && @@ -298,15 +331,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, free_block = ctx->get_free_block(first_read); } else { if (state->enable_reserve_memory()) { - auto reserve_status = thread_context()->try_reserve_memory(block_avg_bytes); - if (!reserve_status.ok()) { - LOG(INFO) << "query: " << print_id(state->query_id()) - << ", scanner try to reserve: " - << PrettyPrinter::print(block_avg_bytes, TUnit::BYTES) - << ", failed: " << reserve_status.to_string() - << ", process info: " - << GlobalMemoryArbitrator::process_mem_log_str(); - ctx->clear_free_blocks(); + size_t block_avg_bytes = scanner->get_block_avg_bytes(); + auto st = thread_context()->try_reserve_memory(block_avg_bytes); + if (!st.ok()) { + handle_reserve_memory_failure(state, ctx, st, block_avg_bytes); break; } } @@ -353,10 +381,17 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); } if (scan_task->cached_blocks.back().first->rows() > 0) { - block_avg_bytes = (scan_task->cached_blocks.back().first->allocated_bytes() + - scan_task->cached_blocks.back().first->rows() - 1) / - scan_task->cached_blocks.back().first->rows() * - ctx->batch_size(); + auto block_avg_bytes = + (scan_task->cached_blocks.back().first->allocated_bytes() + + scan_task->cached_blocks.back().first->rows() - 1) / + scan_task->cached_blocks.back().first->rows() * ctx->batch_size(); + scanner->update_block_avg_bytes(block_avg_bytes); + } + if (ctx->low_memory_mode()) { + ctx->clear_free_blocks(); + if (raw_bytes_threshold > ctx->low_memory_mode_scan_bytes_per_scanner()) { + raw_bytes_threshold = ctx->low_memory_mode_scan_bytes_per_scanner(); + } } } // end for while diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 6c4f3294ce1..dab49b757c8 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -72,7 +72,10 @@ public: virtual Status init() { return Status::OK(); } // Not virtual, all child will call this method explictly virtual Status prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts); - virtual Status open(RuntimeState* state) { return Status::OK(); } + virtual Status open(RuntimeState* state) { + _block_avg_bytes = state->batch_size() * 8; + return Status::OK(); + } Status get_block(RuntimeState* state, Block* block, bool* eos); Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos); @@ -156,6 +159,10 @@ public: _query_statistics = query_statistics; } + auto get_block_avg_bytes() const { return _block_avg_bytes; } + + void update_block_avg_bytes(size_t block_avg_bytes) { _block_avg_bytes = block_avg_bytes; } + protected: void _discard_conjuncts() { for (auto& conjunct : _conjuncts) { @@ -211,6 +218,8 @@ protected: // num of rows return from scanner, after filter block int64_t _num_rows_return = 0; + size_t _block_avg_bytes = 0; + // Set true after counter is updated finally bool _has_updated_counter = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index f0380efd7b6..8de6f379ecd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -397,7 +397,6 @@ public class SessionVariable implements Serializable, Writable { public static final String INTERNAL_SESSION = "internal_session"; - public static final String PARTITIONED_HASH_JOIN_ROWS_THRESHOLD = "partitioned_hash_join_rows_threshold"; public static final String PARTITIONED_HASH_AGG_ROWS_THRESHOLD = "partitioned_hash_agg_rows_threshold"; public static final String PARTITION_PRUNING_EXPAND_THRESHOLD = "partition_pruning_expand_threshold"; @@ -1587,10 +1586,6 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = INTERNAL_SESSION) public boolean internalSession = false; - // Use partitioned hash join if build side row count >= the threshold . 0 - the threshold is not set. - @VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD, fuzzy = true) - public int partitionedHashJoinRowsThreshold = 0; - // Use partitioned hash join if build side row count >= the threshold . 0 - the threshold is not set. @VariableMgr.VarAttr(name = PARTITIONED_HASH_AGG_ROWS_THRESHOLD, fuzzy = true) public int partitionedHashAggRowsThreshold = 0; @@ -2377,7 +2372,6 @@ public class SessionVariable implements Serializable, Writable { // this.disableJoinReorder = random.nextBoolean(); this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean(); this.disableStreamPreaggregations = random.nextBoolean(); - this.partitionedHashJoinRowsThreshold = random.nextBoolean() ? 8 : 1048576; this.partitionedHashAggRowsThreshold = random.nextBoolean() ? 8 : 1048576; this.enableShareHashTableForBroadcastJoin = random.nextBoolean(); // this.enableHashJoinEarlyStartProbe = random.nextBoolean(); @@ -3075,14 +3069,6 @@ public class SessionVariable implements Serializable, Writable { this.queryCacheEntryMaxRows = queryCacheEntryMaxRows; } - public int getPartitionedHashJoinRowsThreshold() { - return partitionedHashJoinRowsThreshold; - } - - public void setPartitionedHashJoinRowsThreshold(int threshold) { - this.partitionedHashJoinRowsThreshold = threshold; - } - // Serialize to thrift object public boolean getForwardToMaster() { return forwardToMaster; @@ -3910,7 +3896,6 @@ public class SessionVariable implements Serializable, Writable { tResult.setSkipDeleteBitmap(skipDeleteBitmap); - tResult.setPartitionedHashJoinRowsThreshold(partitionedHashJoinRowsThreshold); tResult.setPartitionedHashAggRowsThreshold(partitionedHashAggRowsThreshold); tResult.setExternalSortBytesThreshold(externalSortBytesThreshold); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b44196d3df2..8cf33a6218b 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -182,6 +182,7 @@ struct TQueryOptions { 52: optional i32 be_exec_version = 0 + // not used any more 53: optional i32 partitioned_hash_join_rows_threshold = 0 54: optional bool enable_share_hash_table_for_broadcast_join --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org