This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 92ee09e0fd5 fix
92ee09e0fd5 is described below

commit 92ee09e0fd5f97ae1f8b757bf7bb6176ef71e6b0
Author: jacktengg <tengjianp...@selectdb.com>
AuthorDate: Mon Dec 16 16:02:03 2024 +0800

    fix
---
 .../exec/partitioned_hash_join_sink_operator.cpp   | 34 ++++++++++++++++----
 be/src/pipeline/pipeline_task.cpp                  | 37 ----------------------
 be/src/runtime/runtime_state.h                     |  7 ++++
 .../java/org/apache/doris/qe/SessionVariable.java  |  5 +++
 gensrc/thrift/PaloInternalService.thrift           |  1 +
 5 files changed, 41 insertions(+), 43 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 672eb36a907..dad82b6cc8a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -260,7 +260,22 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
             }
         }
 
-        return Status::OK();
+        Status status;
+        if (_child_eos) {
+            std::for_each(_shared_state->partitioned_build_blocks.begin(),
+                          _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
+                              if (block) {
+                                  COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
+                              }
+                          });
+            status = _finish_spilling();
+            VLOG_DEBUG << fmt::format(
+                    "Query: {}, task {}, sink {} _revoke_unpartitioned_block 
set_ready_to_read",
+                    print_id(state->query_id()), state->task_id(), 
_parent->node_id());
+            _dependency->set_ready_to_read();
+        }
+
+        return status;
     };
 
     auto exception_catch_func = [spill_func]() mutable {
@@ -547,12 +562,19 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                 return revoke_memory(state, nullptr);
             } else {
                 const auto revocable_size = revocable_mem_size(state);
-                if (revocable_size >= 
config::revocable_memory_bytes_high_watermark) {
-                    LOG(INFO) << fmt::format(
-                            "Query: {}, sink name: {}, node id: {}, task id: 
{}, "
-                            "revoke_memory "
+                // TODO: consider parallel?
+                // After building hash table it will not be able to spill later
+                // even if memory is low, and will cause cancel of queries.
+                // So make a check here, if build blocks mem usage is too high,
+                // then trigger revoke memory.
+                auto query_mem_limit = state->get_query_ctx()->mem_limit();
+                if (revocable_size >= (double)query_mem_limit / 100.0 *
+                                              
state->revocable_memory_high_watermark_percent()) {
+                    VLOG_DEBUG << fmt::format(
+                            "Query: {}, task {}, sink {}, query mem limit: {}, 
revoke_memory "
                             "because revocable memory is high: {}",
-                            print_id(state->query_id()), get_name(), 
node_id(), state->task_id(),
+                            print_id(state->query_id()), state->task_id(), 
node_id(),
+                            PrettyPrinter::print_bytes(query_mem_limit),
                             PrettyPrinter::print_bytes(revocable_size));
                     return revoke_memory(state, nullptr);
                 }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index cd822ef15e2..d4ed0790942 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -494,43 +494,6 @@ Status PipelineTask::execute(bool* eos) {
                     *eos = false;
                     continue;
                 }
-                if (workload_group) {
-                    bool is_low_watermark = false;
-                    bool is_high_watermark = false;
-                    workload_group->check_mem_used(&is_low_watermark, 
&is_high_watermark);
-                    // for hash join build sink, if it's eos at this reserve, 
it will build hash table and
-                    // it will not be able to spill later even if memory is 
low, and will cause cancel of queries.
-                    // So make a check here, if it's low watermark after 
reserve and if reserved memory is too many,
-                    // then trigger revoke memory.
-
-                    // debug
-                    if (sink_reserve_size > 64 * 1024 * 1024) {
-                        LOG(INFO) << fmt::format(
-                                "Query: {}, sink name: {}, node id: {}, task 
id: {}, "
-                                "is_low_watermark: {}, sink_reserve_size: {}, 
wg mem limit: {}, "
-                                "reserve/wg_limit: {}",
-                                print_id(query_id), _sink->get_name(), 
_sink->node_id(),
-                                _state->task_id(), is_low_watermark,
-                                PrettyPrinter::print_bytes(sink_reserve_size),
-                                
PrettyPrinter::print_bytes(workload_group->memory_limit()),
-                                ((double)sink_reserve_size) / 
workload_group->memory_limit());
-                    }
-                    if (is_low_watermark) {
-                        const auto revocable_size = 
_sink->revocable_mem_size(_state);
-                        if (revocable_size >= 
config::revocable_memory_bytes_high_watermark) {
-                            LOG(INFO) << fmt::format(
-                                    "Query: {}, sink name: {}, node id: {}, 
task id: {}, "
-                                    "sink_reserve_size: {}, revoke_memory "
-                                    "because revocable memory is high: {}",
-                                    print_id(query_id), _sink->get_name(), 
_sink->node_id(),
-                                    _state->task_id(),
-                                    
PrettyPrinter::print_bytes(sink_reserve_size),
-                                    
PrettyPrinter::print_bytes(revocable_size));
-                            RETURN_IF_ERROR(_sink->revoke_memory(_state, 
nullptr));
-                            continue;
-                        }
-                    }
-                }
             }
 
             // Define a lambda function to catch sink exception, because sink 
will check
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 4da3384cb60..7318c93f15a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -597,6 +597,13 @@ public:
         return 1;
     }
 
+    int revocable_memory_high_watermark_percent() const {
+        if (_query_options.__isset.revocable_memory_high_watermark_percent) {
+            return _query_options.revocable_memory_high_watermark_percent;
+        }
+        return 10;
+    }
+
     size_t minimum_operator_memory_required_bytes() const {
         if (_query_options.__isset.minimum_operator_memory_required_kb) {
             return _query_options.minimum_operator_memory_required_kb * 1024;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index af379bc4135..16b214b1536 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -566,6 +566,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String SPILL_STREAMING_AGG_MEM_LIMIT = 
"spill_streaming_agg_mem_limit";
     public static final String MIN_REVOCABLE_MEM = "min_revocable_mem";
     public static final String ENABLE_SPILL = "enable_spill";
+    public static final String REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT = 
"revocable_memory_high_watermark_percent";
     public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory";
     public static final String ENABLE_FORCE_SPILL = "enable_force_spill";
     public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
@@ -2243,6 +2244,9 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true, fuzzy = true)
     public long dataQueueMaxBlocks = 1;
 
+    @VariableMgr.VarAttr(name = REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT, fuzzy 
= true)
+    public int revocableMemoryHighWatermarkPercent = 10;
+
     // If the memory consumption of sort node exceed this limit, will trigger 
spill to disk;
     // Set to 0 to disable; min: 128M
     public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
@@ -3957,6 +3961,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setEnableForceSpill(enableForceSpill);
         tResult.setExternalAggPartitionBits(externalAggPartitionBits);
         tResult.setMinRevocableMem(minRevocableMem);
+        
tResult.setRevocableMemoryHighWatermarkPercent(revocableMemoryHighWatermarkPercent);
         tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
 
         tResult.setEnableLocalMergeSort(enableLocalMergeSort);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index d3a2e4521f9..7ebe16583d8 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -367,6 +367,7 @@ struct TQueryOptions {
   144: optional i32 query_slot_count = 0;
   145: optional bool enable_spill = false
   146: optional bool enable_reserve_memory = true
+  147: optional i32 revocable_memory_high_watermark_percent = 10
 
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to