github-actions[bot] commented on code in PR #61212:
URL: https://github.com/apache/doris/pull/61212#discussion_r2935401954


##########
be/src/runtime/runtime_state.h:
##########
@@ -671,37 +671,91 @@ class RuntimeState {
 
     int64_t spill_min_revocable_mem() const {
         if (_query_options.__isset.min_revocable_mem) {
-            return std::max(_query_options.min_revocable_mem, (int64_t)1);
+            return std::max(_query_options.min_revocable_mem, (int64_t)1 << 
20);
         }
-        return 1;
+        return 32 << 20;
     }
 
-    int64_t spill_sort_mem_limit() const {
-        if (_query_options.__isset.spill_sort_mem_limit) {
-            return std::max(_query_options.spill_sort_mem_limit, 
(int64_t)16777216);
+    int spill_aggregation_partition_count() const {
+        if (_query_options.__isset.spill_aggregation_partition_count) {
+            return _query_options.spill_aggregation_partition_count;
         }
-        return 134217728;
+        return 32;
     }
 
-    int64_t spill_sort_batch_bytes() const {
-        if (_query_options.__isset.spill_sort_batch_bytes) {
-            return std::max(_query_options.spill_sort_batch_bytes, 
(int64_t)8388608);
+    int spill_hash_join_partition_count() const {
+        if (_query_options.__isset.spill_hash_join_partition_count) {
+            return _query_options.spill_hash_join_partition_count;
         }
-        return 8388608;
+        return 4;
     }
 
-    int spill_aggregation_partition_count() const {
-        if (_query_options.__isset.spill_aggregation_partition_count) {
-            return 
std::min(std::max(_query_options.spill_aggregation_partition_count, 16), 8192);
+    int spill_repartition_max_depth() const {
+        if (_query_options.__isset.spill_repartition_max_depth) {
+            // Clamp to a reasonable range: [1, 128]
+            return std::min(_query_options.spill_repartition_max_depth, 128);

Review Comment:
   **Bug (Major):** Comment says "Clamp to a reasonable range: [1, 128]" but 
only the upper bound is enforced. If a user sets `spill_repartition_max_depth = 
0`, the function returns 0.
   
   With depth=0, the repartition check `if (new_level >= 
_repartition_max_depth)` (in both 
`partitioned_hash_join_probe_operator.cpp:424` and 
`partitioned_aggregation_source_operator.cpp:450`) will ALWAYS fail because 
`new_level` (which is `partition.level + 1 >= 1`) is always `>= 0`. This causes 
spill repartitioning to immediately return `InternalError("exceeded max 
depth")`, making queries fail under memory pressure.
   
   Fix:
   ```cpp
   return std::max(1, std::min(_query_options.spill_repartition_max_depth, 
128));
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -3170,7 +3175,38 @@ public void setDetailShapePlanNodes(String 
detailShapePlanNodes) {
     public long spillStreamingAggMemLimit = 268435456; //256MB
 
     @VariableMgr.VarAttr(name = SPILL_HASH_JOIN_PARTITION_COUNT, fuzzy = true)
-    public int spillHashJoinPartitionCount = 32;
+    public int spillHashJoinPartitionCount = 4;
+
+    @VariableMgr.VarAttr(name = SPILL_REPARTITION_MAX_DEPTH, fuzzy = true, 
needForward = true,
+            description = {"重分区的最大递归深度,超过该深度不再继续重分区,\n默认值为 8",
+                "Maximum depth for repartition recursion. When exceeded, 
repartitioning will stop. Default is 8."})
+    public int spillRepartitionMaxDepth = 8;
+
+    @VariableMgr.VarAttr(name = SPILL_JOIN_BUILD_SINK_MEM_LIMIT_BYTES, fuzzy = 
true, needForward = true,
+            description = {"一旦触发 spill 后,join build sink 的 revocable memory 
超过该阈值就主动落盘(字节)。默认 64MB。",
+                "After spill is triggered, join build sink will proactively 
spill when revocable memory "
+                    + "exceeds this threshold (in bytes). Default is 64MB."})
+    public long spillJoinBuildSinkMemLimitBytes = 64L * 1024L * 1024L;
+
+    @VariableMgr.VarAttr(name = SPILL_AGGREGATION_SINK_MEM_LIMIT_BYTES, fuzzy 
= true, needForward = true,
+            description = {"一旦触发 spill 后,aggregation sink 的 revocable memory 
超过该阈值就主动落盘(字节)。默认 64MB。",
+                "After spill is triggered, aggregation sink will proactively 
spill when revocable memory "
+                    + "exceeds this threshold (in bytes). Default is 64GB."})
+    public long spillAggregationSinkMemLimitBytes = 64L * 1024L * 1024L * 
1024L;

Review Comment:
   **Bug (Critical):** Default value is `64L * 1024L * 1024L * 1024L` = 
**64GB**, not 64MB.
   
   All sibling variables use 64MB:
   - `spillJoinBuildSinkMemLimitBytes = 64L * 1024L * 1024L` (64MB)
   - `spillSortSinkMemLimitBytes = 64L * 1024L * 1024L` (64MB)
   - `spillSortMergeMemLimitBytes = 64L * 1024L * 1024L` (64MB)
   
   The thrift default is also 64MB (`67108864`). The Chinese description says 
"默认 64MB" but the English says "Default is 64GB".
   
   The BE accessor clamps to [1MB, 4GB], so the effective value becomes 4GB — 
**62x higher** than the intended 64MB. This effectively disables proactive 
spilling for aggregation sinks, causing excessive memory usage or OOM in 
aggregation spill scenarios.
   
   Fix:
   ```java
   public long spillAggregationSinkMemLimitBytes = 64L * 1024L * 1024L;
   ```
   Also fix the English description to say "Default is 64MB."



##########
be/src/exec/operator/partitioned_hash_join_probe_operator.cpp:
##########
@@ -743,120 +829,174 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
 
 bool PartitionedHashJoinProbeOperatorX::need_more_input_data(RuntimeState* 
state) const {
     auto& local_state = get_local_state(state);
-    if (local_state._shared_state->is_spilled) {
+    if (local_state._shared_state->_is_spilled) {
         return !local_state._child_eos;
-    } else if (local_state._shared_state->inner_runtime_state) {
+    } else if (local_state._shared_state->_inner_runtime_state) {
         return _inner_probe_operator->need_more_input_data(
-                local_state._shared_state->inner_runtime_state.get());
+                local_state._shared_state->_inner_runtime_state.get());
     } else {
         return true;
     }
 }
 
+// Report only this operator's own revocable memory. The pipeline task
+// iterates all operators to sum revocable sizes and revoke each individually.
+// Sum up memory used by in-memory probe blocks and any partially-recovered 
build block for the current partition.
+// This is the memory that can be freed if we choose to revoke and repartition 
the current
 size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* 
state) const {
     auto& local_state = get_local_state(state);
-    if (local_state._child_eos) {
+    if (!local_state._shared_state->_is_spilled) {
         return 0;
     }
 
-    auto revocable_size = _revocable_mem_size(state, true);
-    if (_child) {
-        revocable_size += _child->revocable_mem_size(state);
-    }
-    return revocable_size;
-}
-
-size_t PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* 
state,
-                                                              bool force) 
const {
-    const auto spill_size_threshold =
-            force ? SpillStream::MIN_SPILL_WRITE_BATCH_MEM : 
SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
-    auto& local_state = get_local_state(state);
     size_t mem_size = 0;
-    auto& probe_blocks = local_state._probe_blocks;
-    for (uint32_t i = 0; i < _partition_count; ++i) {
-        for (auto& block : probe_blocks[i]) {
-            mem_size += block.allocated_bytes();
-        }
-
-        auto& partitioned_block = local_state._partitioned_blocks[i];
-        if (partitioned_block) {
-            auto block_bytes = partitioned_block->allocated_bytes();
-            if (block_bytes >= spill_size_threshold) {
-                mem_size += block_bytes;
+    if (!local_state._child_eos) {
+        for (uint32_t i = 0; i < _partition_count; ++i) {
+            auto& partitioned_block = local_state._partitioned_blocks[i];
+            if (!partitioned_block) {
+                continue;
+            }
+            const auto bytes = partitioned_block->allocated_bytes();
+            if (bytes >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
+                mem_size += bytes;
             }
         }
+        return mem_size > state->spill_min_revocable_mem() ? mem_size : 0;
+    }
+    if (!local_state._current_partition.is_valid() ||
+        local_state._current_partition.build_finished) {
+        // No active partition — no revocable memory.
+        // Or if current partition has finished build hash table.
+        return 0;
+    }
+
+    // Include build-side memory that has been recovered but not yet consumed 
by the hash table.
+    // This data is revocable because we can repartition instead of building 
the hash table.
+    if (local_state._recovered_build_block) {
+        mem_size += local_state._recovered_build_block->allocated_bytes();
     }
-    return mem_size;
+
+    return mem_size > state->spill_min_revocable_mem() ? mem_size : 0;
 }
 
 size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* 
state) {
     auto& local_state = get_local_state(state);
-    const auto is_spilled = local_state._shared_state->is_spilled;
-    if (!is_spilled || local_state._child_eos) {
+    const bool is_spilled = local_state._shared_state->_is_spilled;
+
+    // Non-spill path: delegate to the inner probe operator / base class.
+    if (!is_spilled) {
         return Base::get_reserve_mem_size(state);
     }
 
-    size_t size_to_reserve = SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
+    // Spill path, probe data still flowing in (child not yet EOS):
+    // We only need room for incoming probe blocks being partitioned. Reserve
+    // one batch worth of spill-write memory; no hash table will be built yet.
+    if (!local_state._child_eos) {
+        return state->minimum_operator_memory_required_bytes();
+    }
+
+    // Spill path, child EOS — we are in the recovery / build / probe phase.
+    // Baseline reservation is one block of spill I/O.
+    size_t size_to_reserve = state->minimum_operator_memory_required_bytes();
 
-    if (local_state._need_to_setup_internal_operators) {
-        const size_t rows =
-                (local_state._recovered_build_block ? 
local_state._recovered_build_block->rows()
-                                                    : 0) +
-                state->batch_size();
-        size_t bucket_size = hash_join_table_calc_bucket_size(rows);
+    const bool about_to_build = local_state._current_partition.is_valid() &&
+                                !local_state._current_partition.build_finished;
 
+    if (about_to_build) {
+        // Estimate rows that will land in the hash table so we can reserve
+        // enough for JoinHashTable::first[] + JoinHashTable::next[].
+        size_t rows = std::max(static_cast<size_t>(state->batch_size()),
+                               
static_cast<size_t>(local_state._recovered_build_block->rows()));

Review Comment:
   **Bug (Edge case):** `_recovered_build_block` can be `nullptr` when 
`about_to_build` is true, causing a null dereference here.
   
   Scenario:
   1. At line 749, `_recovered_build_block` is reset when a new partition is 
popped
   2. `recover_build_blocks_from_partition` is called (line 763) and reads the 
build file
   3. If ALL blocks from the file happen to be empty (line 359 `continue`), the 
function reaches EOS without ever creating `_recovered_build_block` (lines 
366-371 never execute)
   4. `build_file` is reset (line 379), function returns OK
   5. Back in `_pull_from_spill_queue`, the function returns OK to the pipeline 
task
   6. In the NEXT pipeline iteration, `get_reserve_mem_size` is called. At this 
point: `is_valid()=true`, `build_finished=false` → `about_to_build=true`, but 
`_recovered_build_block` is null
   7. Line 909: `_recovered_build_block->rows()` → **null dereference**
   
   While an all-empty-blocks spill file is an edge case, the code should be 
defensively correct. The simplest fix:
   ```cpp
   if (about_to_build && local_state._recovered_build_block) {
   ```
   Or add a null check:
   ```cpp
   size_t rows = local_state._recovered_build_block
       ? std::max(static_cast<size_t>(state->batch_size()),
                  
static_cast<size_t>(local_state._recovered_build_block->rows()))
       : static_cast<size_t>(state->batch_size());
   ```



##########
be/src/exec/operator/partitioned_aggregation_source_operator.cpp:
##########
@@ -135,79 +186,212 @@ bool 
PartitionedAggSourceOperatorX::is_shuffled_operator() const {
     return _agg_source_operator->is_shuffled_operator();
 }
 
+size_t PartitionedAggSourceOperatorX::revocable_mem_size(RuntimeState* state) 
const {
+    auto& local_state = get_local_state(state);
+    if (!local_state._shared_state->_is_spilled || 
!local_state._current_partition.spill_file) {
+        return 0;
+    }
+
+    size_t bytes = 0;
+    for (const auto& block : local_state._blocks) {
+        bytes += block.allocated_bytes();
+    }
+    if (local_state._shared_state->_in_mem_shared_state != nullptr &&
+        local_state._shared_state->_in_mem_shared_state->agg_data != nullptr) {
+        auto* agg_data = 
local_state._shared_state->_in_mem_shared_state->agg_data.get();
+        bytes += std::visit(Overload {[&](std::monostate& arg) -> size_t { 
return 0; },
+                                      [&](auto& agg_method) -> size_t {
+                                          return 
agg_method.hash_table->get_buffer_size_in_bytes();
+                                      }},
+                            agg_data->method_variant);
+
+        if (auto& aggregate_data_container =
+                    
local_state._shared_state->_in_mem_shared_state->aggregate_data_container;
+            aggregate_data_container) {
+            bytes += aggregate_data_container->memory_usage();
+        }
+    }
+    return bytes > state->spill_min_revocable_mem() ? bytes : 0;
+}
+
+Status PartitionedAggSourceOperatorX::revoke_memory(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    if (!local_state._shared_state->_is_spilled) {
+        return Status::OK();
+    }
+    VLOG_DEBUG << fmt::format("Query:{}, agg source:{}, task:{}, 
revoke_memory, hash table size:{}",
+                              print_id(state->query_id()), node_id(), 
state->task_id(),
+                              
PrettyPrinter::print_bytes(local_state._estimate_memory_usage));
+
+    // Flush hash table + repartition remaining spill files of the current 
partition.
+    RETURN_IF_ERROR(local_state._flush_and_repartition(state));
+    local_state._current_partition = AggSpillPartitionInfo {};
+    local_state._need_to_setup_partition = true;
+    return Status::OK();
+}
+
 Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
-    local_state.copy_shared_spill_profile();
     Status status;
-    Defer defer {[&]() {
-        if (!status.ok() || *eos) {
-            local_state._shared_state->close();
-        }
-    }};
 
     SCOPED_TIMER(local_state.exec_time_counter());
 
-    if (local_state._shared_state->is_spilled &&
-        local_state._need_to_merge_data_for_current_partition) {
-        if (local_state._blocks.empty() && 
!local_state._current_partition_eos) {
-            bool has_recovering_data = false;
-            status = local_state.recover_blocks_from_disk(state, 
has_recovering_data);
-            RETURN_IF_ERROR(status);
-            *eos = !has_recovering_data;
+    // ── Fast path: not spilled ─────────────────────────────────────────
+    if (!local_state._shared_state->_is_spilled) {
+        auto* runtime_state = local_state._runtime_state.get();
+        
local_state._shared_state->_in_mem_shared_state->aggregate_data_container->init_once();
+        status = _agg_source_operator->get_block(runtime_state, block, eos);
+        RETURN_IF_ERROR(status);
+        if (*eos) {
+            auto* source_local_state =
+                    
runtime_state->get_local_state(_agg_source_operator->operator_id());
+            
local_state._update_profile<false>(source_local_state->custom_profile());
+        }
+        local_state.reached_limit(block, eos);
+        return Status::OK();
+    }
+
+    // ── Spilled path ───────────────────────────────────────────────────
+    // One-time: move original spill_partitions from shared state into unified 
queue.
+    if (local_state._partition_queue.empty() && 
local_state._need_to_setup_partition &&
+        !local_state._shared_state->_spill_partitions.empty()) {
+        local_state._init_partition_queue();
+    }
+
+    // Phase 1: Pop next partition from queue if needed.
+    if (local_state._need_to_setup_partition) {
+        if (local_state._partition_queue.empty()) {
+            *eos = true;
             return Status::OK();
-        } else if (!local_state._blocks.empty()) {
-            size_t merged_rows = 0;
-            while (!local_state._blocks.empty()) {
-                auto block_ = std::move(local_state._blocks.front());
-                merged_rows += block_.rows();
-                local_state._blocks.erase(local_state._blocks.begin());
-                status = 
_agg_source_operator->merge_with_serialized_key_helper(
-                        local_state._runtime_state.get(), &block_);
-                RETURN_IF_ERROR(status);
-            }
-            local_state._estimate_memory_usage +=
-                    
_agg_source_operator->get_estimated_memory_size_for_merging(
-                            local_state._runtime_state.get(), merged_rows);
+        }
 
-            if (!local_state._current_partition_eos) {
-                return Status::OK();
-            }
+        local_state._current_partition = 
std::move(local_state._partition_queue.front());
+        local_state._partition_queue.pop_front();
+        local_state._blocks.clear();
+        local_state._estimate_memory_usage = 0;
+
+        VLOG_DEBUG << fmt::format(
+                "Query:{}, agg source:{}, task:{}, setup partition level:{}, "
+                "queue remaining:{}",
+                print_id(state->query_id()), node_id(), state->task_id(),
+                local_state._current_partition.level, 
local_state._partition_queue.size());
+        local_state._need_to_setup_partition = false;
+    }
+
+    // Phase 2: Recover blocks from disk into _blocks (batch of ~8MB).
+    if (local_state._blocks.empty() && 
local_state._current_partition.spill_file) {
+        RETURN_IF_ERROR(
+                local_state._recover_blocks_from_partition(state, 
local_state._current_partition));
+        // Return empty block to yield to pipeline scheduler.
+        // Pipeline task will check memory and call revoke_memory if needed.
+        *eos = false;
+        return Status::OK();
+    }
+
+    // Phase 3: Merge recovered blocks into hash table.
+    if (!local_state._blocks.empty()) {
+        size_t merged_rows = 0;
+        while (!local_state._blocks.empty()) {
+            auto blk = std::move(local_state._blocks.front());
+            merged_rows += blk.rows();
+            local_state._blocks.erase(local_state._blocks.begin());

Review Comment:
   **Minor (Performance):** `_blocks.erase(_blocks.begin())` on a `std::vector` 
is O(n) per call (shifts all elements), making this loop O(n²) overall.
   
   With typical block sizes and 8MB buffer limits, n is usually small (single 
digits), so the practical impact is minimal. Still, a cleaner pattern would be 
index-based iteration:
   ```cpp
   for (size_t i = 0; i < local_state._blocks.size(); ++i) {
       auto blk = std::move(local_state._blocks[i]);
       merged_rows += blk.rows();
       status = _agg_source_operator->merge_with_serialized_key_helper(...);
       RETURN_IF_ERROR(status);
   }
   local_state._blocks.clear();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to