This is an automated email from the ASF dual-hosted git repository.
jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 085897a6857 improve logs
085897a6857 is described below
commit 085897a6857ac71fc6da7848409471c8119163a4
Author: jacktengg <[email protected]>
AuthorDate: Tue Dec 17 11:34:02 2024 +0800
improve logs
---
be/src/common/config.cpp | 2 -
be/src/common/config.h | 2 -
be/src/pipeline/exec/operator.h | 2 -
.../pipeline/exec/partition_sort_sink_operator.cpp | 4 +-
.../exec/partitioned_aggregation_sink_operator.cpp | 37 +++++---
.../exec/partitioned_aggregation_sink_operator.h | 6 +-
.../exec/partitioned_hash_join_probe_operator.cpp | 14 +--
.../exec/partitioned_hash_join_sink_operator.cpp | 100 ++++++++++++---------
.../exec/partitioned_hash_join_sink_operator.h | 2 -
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 +-
be/src/pipeline/exec/spill_sort_sink_operator.h | 1 -
be/src/pipeline/pipeline_task.cpp | 1 -
be/src/runtime/memory/global_memory_arbitrator.h | 2 +-
be/src/runtime/memory/memory_profile.cpp | 9 +-
be/src/runtime/query_context.h | 3 -
be/src/runtime/runtime_state.h | 2 +-
be/src/vec/common/allocator.cpp | 7 +-
.../java/org/apache/doris/qe/SessionVariable.java | 2 +-
gensrc/thrift/PaloInternalService.thrift | 2 +-
19 files changed, 102 insertions(+), 102 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f8b7b390ee9..f0184db7067 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1400,8 +1400,6 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction,
"false");
DEFINE_Bool(enable_table_size_correctness_check, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
-DEFINE_mInt32(revocable_memory_bytes_high_watermark, "5");
-
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 1b9b8a3d531..d935f5278f2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1483,8 +1483,6 @@ DECLARE_mBool(enable_delete_bitmap_merge_on_compaction);
// Enable validation to check the correctness of table size.
DECLARE_Bool(enable_table_size_correctness_check);
-DECLARE_mInt32(revocable_memory_bytes_high_watermark);
-
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 723e14f4bc7..af13ded196e 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -589,8 +589,6 @@ public:
return state->minimum_operator_memory_required_bytes();
}
- [[nodiscard]] virtual bool is_spilled(RuntimeState* state) const { return
false; }
-
[[nodiscard]] bool is_spillable() const { return _spillable; }
template <class TARGET>
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 759c7ea2bcc..b90af92f6b1 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -70,9 +70,7 @@
PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int ope
_topn_phase(tnode.partition_sort_node.ptopn_phase),
_has_global_limit(tnode.partition_sort_node.has_global_limit),
_top_n_algorithm(tnode.partition_sort_node.top_n_algorithm),
-
_partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {
- _spillable = true;
-}
+
_partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {}
Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 6f008a3b1f2..58b272b3ac8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -27,6 +27,7 @@
#include "pipeline/exec/spill_utils.h"
#include "pipeline/pipeline_task.h"
#include "runtime/fragment_mgr.h"
+#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "vec/spill/spill_stream.h"
#include "vec/spill/spill_stream_manager.h"
@@ -180,7 +181,19 @@ Status
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
return Status::Error<INTERNAL_ERROR>("fault_inject
partitioned_agg_sink sink failed");
});
RETURN_IF_ERROR(_agg_sink_operator->sink(runtime_state, in_block, false));
+
+ size_t revocable_size = 0;
+ int64_t query_mem_limit = 0;
if (eos) {
+ revocable_size = revocable_mem_size(state);
+ query_mem_limit = state->get_query_ctx()->get_mem_limit();
+ LOG(INFO) << fmt::format(
+ "Query: {}, task {}, agg sink {} eos, need spill: {}, query
mem limit: {}, "
+ "revocable memory: {}",
+ print_id(state->query_id()), state->task_id(), node_id(),
+ local_state._shared_state->is_spilled,
PrettyPrinter::print_bytes(query_mem_limit),
+ PrettyPrinter::print_bytes(revocable_size));
+
if (local_state._shared_state->is_spilled) {
if (revocable_mem_size(state) > 0) {
RETURN_IF_ERROR(revoke_memory(state, nullptr));
@@ -256,10 +269,12 @@ size_t
PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bo
Status PartitionedAggSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
const auto size_to_revoke = _parent->revocable_mem_size(state);
- VLOG_DEBUG << "Query " << print_id(state->query_id()) << " agg node "
- << Base::_parent->node_id()
- << " revoke_memory, size: " <<
_parent->revocable_mem_size(state)
- << ", eos: " << _eos;
+ LOG(INFO) << fmt::format(
+ "Query: {}, task {}, agg sink {} revoke_memory, eos: {}, need
spill: {}, revocable "
+ "memory: {}",
+ print_id(state->query_id()), state->task_id(), _parent->node_id(),
_eos,
+ _shared_state->is_spilled,
+ PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
if (!_shared_state->is_spilled) {
_shared_state->is_spilled = true;
profile()->add_info_string("Spilled", "true");
@@ -309,9 +324,12 @@ Status PartitionedAggSinkLocalState::revoke_memory(
}
_shared_state->close();
} else {
- VLOG_DEBUG << "Query " << print_id(query_id) << " agg
node "
- << Base::_parent->node_id() << "
revoke_memory finish, size: "
- << _parent->revocable_mem_size(state) << ",
eos: " << _eos;
+ LOG(INFO) << fmt::format(
+ "Query: {}, task {}, agg sink {} revoke_memory
finish, eos: {}, "
+ "revocable memory: {}",
+ print_id(state->query_id()), state->task_id(),
_parent->node_id(),
+ _eos,
+
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
}
if (_eos) {
@@ -341,9 +359,4 @@ Status PartitionedAggSinkLocalState::revoke_memory(
std::move(spill_runnable));
}
-bool PartitionedAggSinkOperatorX::is_spilled(RuntimeState* state) const {
- auto& local_state = get_local_state(state);
- return local_state._shared_state->is_spilled;
-}
-
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 922798707d0..499db4919e7 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -22,6 +22,7 @@
#include "aggregation_sink_operator.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/operator.h"
+#include "util/pretty_printer.h"
#include "vec/exprs/vectorized_agg_fn.h"
#include "vec/exprs/vexpr.h"
#include "vec/spill/spill_stream.h"
@@ -83,8 +84,7 @@ public:
total_rows / size_to_revoke_));
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: "
<< _parent->node_id()
- << ", spill_batch_rows: " << spill_batch_rows << ", total
rows: " << total_rows
- << ", size_to_revoke: " << size_to_revoke;
+ << ", spill_batch_rows: " << spill_batch_rows << ", total
rows: " << total_rows;
size_t row_count = 0;
std::vector<TmpSpillInfo<typename HashTableType::key_type>>
spill_infos(
@@ -333,8 +333,6 @@ public:
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
- bool is_spilled(RuntimeState* state) const override;
-
private:
friend class PartitionedAggSinkLocalState;
std::unique_ptr<AggSinkOperatorX> _agg_sink_operator;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index bbbcb9b9d5e..f6cea157cd5 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -339,9 +339,8 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
}
}
- auto block_bytes = _recovered_build_block->allocated_bytes();
- COUNTER_UPDATE(_memory_used_counter, block_bytes);
- if (block_bytes >=
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+ if (_recovered_build_block->allocated_bytes() >=
+ vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
break;
}
}
@@ -608,10 +607,7 @@ Status
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
}
}
- auto old_probe_blocks_bytes = local_state._probe_blocks_bytes->value();
COUNTER_SET(local_state._probe_blocks_bytes, bytes_of_blocks);
- COUNTER_UPDATE(local_state._memory_used_counter,
- local_state._probe_blocks_bytes->value() -
old_probe_blocks_bytes);
return Status::OK();
}
@@ -933,13 +929,8 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
COUNTER_SET(local_state._memory_usage_reserved,
int64_t(local_state.estimate_memory_usage()));
});
- LOG(INFO) << "Query: " << print_id(state->query_id()) << ", hash probe
node: " << node_id()
- << ", task: " << state->task_id()
- << " get_block, child eos: " << local_state._child_eos
- << ", need spill: " << need_to_spill;
if (need_more_input_data(state)) {
- LOG(INFO) << "need more input data";
{
SCOPED_TIMER(local_state._get_child_next_timer);
RETURN_IF_ERROR(_child->get_block_after_projects(state,
local_state._child_block.get(),
@@ -969,7 +960,6 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
}
if (!need_more_input_data(state)) {
- LOG(INFO) << "not need more input data";
SCOPED_TIMER(local_state.exec_time_counter());
if (need_to_spill) {
RETURN_IF_ERROR(pull(state, block, eos));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index dad82b6cc8a..6cf9e658a60 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -270,7 +270,8 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
});
status = _finish_spilling();
VLOG_DEBUG << fmt::format(
- "Query: {}, task {}, sink {} _revoke_unpartitioned_block
set_ready_to_read",
+ "Query: {}, task {}, hash join sink {}
_revoke_unpartitioned_block "
+ "set_ready_to_read",
print_id(state->query_id()), state->task_id(),
_parent->node_id());
_dependency->set_ready_to_read();
}
@@ -303,7 +304,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
SCOPED_TIMER(_spill_total_timer);
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " <<
state->task_id()
- << " sink " << _parent->node_id() << " revoke_memory"
+ << " hash join sink " << _parent->node_id() << " revoke_memory"
<< ", eos: " << _child_eos;
CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr);
@@ -321,9 +322,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
auto spill_fin_cb = [this, state, query_id, spill_context]() {
Status status;
if (_child_eos) {
- VLOG_DEBUG << "Query:" << print_id(this->state()->query_id()) <<
", task "
- << state->task_id() << " sink " << _parent->node_id()
- << " set_ready_to_read";
+ LOG(INFO) << "Query:" << print_id(this->state()->query_id()) << ",
task "
+ << state->task_id() << " hash join sink " <<
_parent->node_id()
+ << " finish spilling, set_ready_to_read";
std::for_each(_shared_state->partitioned_build_blocks.begin(),
_shared_state->partitioned_build_blocks.end(),
[&](auto& block) {
if (block) {
@@ -542,43 +543,28 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
local_state._child_eos = eos;
- Defer defer_dgb {[&]() {
- if (local_state.revocable_mem_size(state) > 128 * 1024 * 1024) {
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task
" << state->task_id()
- << " sink " << node_id() << " _child_eos: " <<
local_state._child_eos
- << ", revocable memory: "
- <<
PrettyPrinter::print_bytes(local_state.revocable_mem_size(state));
- }
- }};
const auto rows = in_block->rows();
const auto need_to_spill = local_state._shared_state->need_to_spill;
+ size_t revocable_size = 0;
+ int64_t query_mem_limit = 0;
+ if (eos) {
+ revocable_size = revocable_mem_size(state);
+ query_mem_limit = state->get_query_ctx()->get_mem_limit();
+ LOG(INFO) << fmt::format(
+ "Query: {}, task {}, hash join sink {} eos, need spill: {},
query mem limit: {}, "
+ "revocable "
+ "memory: {}",
+ print_id(state->query_id()), state->task_id(), node_id(),
need_to_spill,
+ PrettyPrinter::print_bytes(query_mem_limit),
+ PrettyPrinter::print_bytes(revocable_size));
+ }
+
if (rows == 0) {
if (eos) {
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task
" << state->task_id()
- << " sink " << node_id() << " eos, need spill: " <<
need_to_spill;
-
if (need_to_spill) {
return revoke_memory(state, nullptr);
} else {
- const auto revocable_size = revocable_mem_size(state);
- // TODO: consider parallel?
- // After building hash table it will not be able to spill later
- // even if memory is low, and will cause cancel of queries.
- // So make a check here, if build blocks mem usage is too high,
- // then trigger revoke memory.
- auto query_mem_limit = state->get_query_ctx()->mem_limit();
- if (revocable_size >= (double)query_mem_limit / 100.0 *
-
state->revocable_memory_high_watermark_percent()) {
- VLOG_DEBUG << fmt::format(
- "Query: {}, task {}, sink {}, query mem limit: {},
revoke_memory "
- "because revocable memory is high: {}",
- print_id(state->query_id()), state->task_id(),
node_id(),
- PrettyPrinter::print_bytes(query_mem_limit),
- PrettyPrinter::print_bytes(revocable_size));
- return revoke_memory(state, nullptr);
- }
-
if (UNLIKELY(!local_state._shared_state->inner_runtime_state))
{
RETURN_IF_ERROR(_setup_internal_operator(state));
}
@@ -587,12 +573,31 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
"fault_inject partitioned_hash_join_sink "
"sink_eos failed");
});
+
+ // TODO: consider parallel?
+ // After building hash table it will not be able to spill later
+ // even if memory is low, and will cause cancel of queries.
+ // So make a check here, if build blocks mem usage is too high,
+ // then trigger revoke memory.
+ auto revocable_memory_high_watermark_percent =
+ state->revocable_memory_high_watermark_percent();
+ if (revocable_memory_high_watermark_percent > 0 &&
+ revocable_size >= (double)query_mem_limit / 100.0 *
+
revocable_memory_high_watermark_percent) {
+ LOG(INFO) << fmt::format(
+ "Query: {}, task {}, hash join sink {} eos,
revoke_memory "
+ "because revocable memory is high",
+ print_id(state->query_id()), state->task_id(),
node_id());
+ return revoke_memory(state, nullptr);
+ }
+
Defer defer {[&]() { local_state.update_memory_usage(); }};
RETURN_IF_ERROR(_inner_sink_operator->sink(
local_state._shared_state->inner_runtime_state.get(),
in_block, eos));
- VLOG_DEBUG << fmt::format(
- "Query: {}, task {}, sink {} eos, set_ready_to_read,
nonspill memory "
+ LOG(INFO) << fmt::format(
+ "Query: {}, task {}, hash join sink {} eos,
set_ready_to_read, nonspill "
+ "memory "
"usage: {}",
print_id(state->query_id()), state->task_id(),
node_id(),
_inner_sink_operator->get_memory_usage_debug_str(
@@ -628,12 +633,26 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
"fault_inject partitioned_hash_join_sink "
"sink failed");
});
+
+ if (eos) {
+ auto revocable_memory_high_watermark_percent =
+ state->revocable_memory_high_watermark_percent();
+ if (revocable_memory_high_watermark_percent > 0 &&
+ revocable_size >=
+ (double)query_mem_limit / 100.0 *
revocable_memory_high_watermark_percent) {
+ LOG(INFO) << fmt::format(
+ "Query: {}, task {}, hash join sink {} eos,
revoke_memory "
+ "because revocable memory is high",
+ print_id(state->query_id()), state->task_id(),
node_id());
+ return revoke_memory(state, nullptr);
+ }
+ }
RETURN_IF_ERROR(_inner_sink_operator->sink(
local_state._shared_state->inner_runtime_state.get(),
in_block, eos));
local_state.update_memory_usage();
if (eos) {
- VLOG_DEBUG << fmt::format(
- "Query: {}, task {}, sink {} eos, set_ready_to_read,
nonspill memory "
+ LOG(INFO) << fmt::format(
+ "Query: {}, task {}, hash join sink {} eos,
set_ready_to_read, nonspill memory "
"usage: {}",
print_id(state->query_id()), state->task_id(), node_id(),
_inner_sink_operator->get_memory_usage_debug_str(
@@ -663,9 +682,4 @@ size_t
PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* stat
return local_state.get_reserve_mem_size(state, eos);
}
-bool PartitionedHashJoinSinkOperatorX::is_spilled(RuntimeState* state) const {
- auto& local_state = get_local_state(state);
- return local_state._shared_state->need_to_spill;
-}
-
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 9e253ce3fca..b5e28f8b244 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -134,8 +134,6 @@ public:
return _inner_probe_operator->require_data_distribution();
}
- bool is_spilled(RuntimeState* state) const override;
-
private:
friend class PartitionedHashJoinSinkLocalState;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 6a472a09cfd..2fa0c0ce8e1 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -114,6 +114,7 @@ SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool*
pool, int operator_id
const TPlanNode& tnode, const
DescriptorTbl& descs,
bool
require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id) {
+ _spillable = true;
_sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool,
operator_id, tnode, descs,
require_bucket_distribution);
}
@@ -299,9 +300,4 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state,
return status;
}
-bool SpillSortSinkOperatorX::is_spilled(RuntimeState* state) const {
- auto& local_state = get_local_state(state);
- return local_state._shared_state->is_spilled;
-}
-
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 226fe61d386..3d6ccdcc4ce 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -91,7 +91,6 @@ public:
Status revoke_memory(RuntimeState* state,
const std::shared_ptr<SpillContext>& spill_context)
override;
- bool is_spilled(RuntimeState* state) const override;
using DataSinkOperatorX<LocalStateType>::node_id;
using DataSinkOperatorX<LocalStateType>::operator_id;
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index d4ed0790942..9d284b31861 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -43,7 +43,6 @@
#include "util/container_util.hpp"
#include "util/defer_op.h"
#include "util/mem_info.h"
-#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h
b/be/src/runtime/memory/global_memory_arbitrator.h
index abf3a51c9f3..05963132cb1 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -76,7 +76,7 @@ public:
static inline int64_t sys_mem_available() {
return MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed) -
refresh_interval_memory_growth.load(std::memory_order_relaxed) -
- process_reserved_memory();
+ process_reserved_memory() + MemInfo::allocator_cache_mem();
}
static inline std::string sys_mem_available_str() {
diff --git a/be/src/runtime/memory/memory_profile.cpp
b/be/src/runtime/memory/memory_profile.cpp
index 8dbdcbdd3af..c7421236c42 100644
--- a/be/src/runtime/memory/memory_profile.cpp
+++ b/be/src/runtime/memory/memory_profile.cpp
@@ -343,10 +343,11 @@ int64_t MemoryProfile::other_current_usage() {
void MemoryProfile::print_log_process_usage() {
if (_enable_print_log_process_usage) {
_enable_print_log_process_usage = false;
- LOG(WARNING) << "Process Memory Summary: " +
GlobalMemoryArbitrator::process_mem_log_str();
- LOG(WARNING) << "\n" << print_memory_overview_profile();
- LOG(WARNING) << "\n" << print_global_memory_profile();
- LOG(WARNING) << "\n" << print_top_memory_tasks_profile();
+ LOG(WARNING) << "Process Memory Summary: " +
GlobalMemoryArbitrator::process_mem_log_str()
+ << "\n"
+ << print_memory_overview_profile() << "\n"
+ << print_global_memory_profile() << "\n"
+ << print_top_memory_tasks_profile();
}
}
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 55b51088c50..bee6ad549e9 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -213,8 +213,6 @@ public:
ThreadPool* get_memtable_flush_pool();
- int64_t mem_limit() const { return _bytes_limit; }
-
void set_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
_merge_controller_handler = handler;
@@ -348,7 +346,6 @@ private:
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
MonotonicStopWatch _query_watcher;
- int64_t _bytes_limit = 0;
bool _is_nereids = false;
std::atomic<int> _running_big_mem_op_num = 0;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 7318c93f15a..16f500b2fcc 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -601,7 +601,7 @@ public:
if (_query_options.__isset.revocable_memory_high_watermark_percent) {
return _query_options.revocable_memory_high_watermark_percent;
}
- return 10;
+ return -1;
}
size_t minimum_operator_memory_required_bytes() const {
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index c8f0a7397d7..c59ad83c98d 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -35,6 +35,7 @@
#include "runtime/process_profile.h"
#include "runtime/thread_context.h"
#include "util/mem_info.h"
+#include "util/pretty_printer.h"
#include "util/stack_util.h"
#include "util/uid_util.h"
@@ -86,8 +87,10 @@ void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::sys_mem
"Allocator sys memory check failed: Cannot alloc:{}, consuming
"
"tracker:<{}>, peak used {}, current used {}, exec node:<{}>,
{}.",
size, doris::thread_context()->thread_mem_tracker()->label(),
-
doris::thread_context()->thread_mem_tracker()->peak_consumption(),
- doris::thread_context()->thread_mem_tracker()->consumption(),
+ doris::PrettyPrinter::print_bytes(
+
doris::thread_context()->thread_mem_tracker()->peak_consumption()),
+ doris::PrettyPrinter::print_bytes(
+
doris::thread_context()->thread_mem_tracker()->consumption()),
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label(),
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 16b214b1536..f0380efd7b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -2245,7 +2245,7 @@ public class SessionVariable implements Serializable,
Writable {
public long dataQueueMaxBlocks = 1;
@VariableMgr.VarAttr(name = REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT, fuzzy
= true)
- public int revocableMemoryHighWatermarkPercent = 10;
+ public int revocableMemoryHighWatermarkPercent = -1;
// If the memory consumption of sort node exceed this limit, will trigger
spill to disk;
// Set to 0 to disable; min: 128M
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 7ebe16583d8..b44196d3df2 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -367,7 +367,7 @@ struct TQueryOptions {
144: optional i32 query_slot_count = 0;
145: optional bool enable_spill = false
146: optional bool enable_reserve_memory = true
- 147: optional i32 revocable_memory_high_watermark_percent = 10
+ 147: optional i32 revocable_memory_high_watermark_percent = -1
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]