github-actions[bot] commented on code in PR #61212:
URL: https://github.com/apache/doris/pull/61212#discussion_r2935401954
##########
be/src/runtime/runtime_state.h:
##########
@@ -671,37 +671,91 @@ class RuntimeState {
int64_t spill_min_revocable_mem() const {
if (_query_options.__isset.min_revocable_mem) {
- return std::max(_query_options.min_revocable_mem, (int64_t)1);
+ return std::max(_query_options.min_revocable_mem, (int64_t)1 <<
20);
}
- return 1;
+ return 32 << 20;
}
- int64_t spill_sort_mem_limit() const {
- if (_query_options.__isset.spill_sort_mem_limit) {
- return std::max(_query_options.spill_sort_mem_limit,
(int64_t)16777216);
+ int spill_aggregation_partition_count() const {
+ if (_query_options.__isset.spill_aggregation_partition_count) {
+ return _query_options.spill_aggregation_partition_count;
}
- return 134217728;
+ return 32;
}
- int64_t spill_sort_batch_bytes() const {
- if (_query_options.__isset.spill_sort_batch_bytes) {
- return std::max(_query_options.spill_sort_batch_bytes,
(int64_t)8388608);
+ int spill_hash_join_partition_count() const {
+ if (_query_options.__isset.spill_hash_join_partition_count) {
+ return _query_options.spill_hash_join_partition_count;
}
- return 8388608;
+ return 4;
}
- int spill_aggregation_partition_count() const {
- if (_query_options.__isset.spill_aggregation_partition_count) {
- return
std::min(std::max(_query_options.spill_aggregation_partition_count, 16), 8192);
+ int spill_repartition_max_depth() const {
+ if (_query_options.__isset.spill_repartition_max_depth) {
+ // Clamp to a reasonable range: [1, 128]
+ return std::min(_query_options.spill_repartition_max_depth, 128);
Review Comment:
**Bug (Major):** Comment says "Clamp to a reasonable range: [1, 128]" but
only the upper bound is enforced. If a user sets `spill_repartition_max_depth =
0`, the function returns 0.
With depth=0, the repartition check `if (new_level >=
_repartition_max_depth)` (in both
`partitioned_hash_join_probe_operator.cpp:424` and
`partitioned_aggregation_source_operator.cpp:450`) will ALWAYS fail because
`new_level` (which is `partition.level + 1 >= 1`) is always `>= 0`. This causes
spill repartitioning to immediately return `InternalError("exceeded max
depth")`, making queries fail under memory pressure.
Fix:
```cpp
return std::max(1, std::min(_query_options.spill_repartition_max_depth,
128));
```
##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -3170,7 +3175,38 @@ public void setDetailShapePlanNodes(String
detailShapePlanNodes) {
public long spillStreamingAggMemLimit = 268435456; //256MB
@VariableMgr.VarAttr(name = SPILL_HASH_JOIN_PARTITION_COUNT, fuzzy = true)
- public int spillHashJoinPartitionCount = 32;
+ public int spillHashJoinPartitionCount = 4;
+
+ @VariableMgr.VarAttr(name = SPILL_REPARTITION_MAX_DEPTH, fuzzy = true,
needForward = true,
+ description = {"重分区的最大递归深度,超过该深度不再继续重分区,\n默认值为 8",
+ "Maximum depth for repartition recursion. When exceeded,
repartitioning will stop. Default is 8."})
+ public int spillRepartitionMaxDepth = 8;
+
+ @VariableMgr.VarAttr(name = SPILL_JOIN_BUILD_SINK_MEM_LIMIT_BYTES, fuzzy =
true, needForward = true,
+ description = {"一旦触发 spill 后,join build sink 的 revocable memory
超过该阈值就主动落盘(字节)。默认 64MB。",
+ "After spill is triggered, join build sink will proactively
spill when revocable memory "
+ + "exceeds this threshold (in bytes). Default is 64MB."})
+ public long spillJoinBuildSinkMemLimitBytes = 64L * 1024L * 1024L;
+
+ @VariableMgr.VarAttr(name = SPILL_AGGREGATION_SINK_MEM_LIMIT_BYTES, fuzzy
= true, needForward = true,
+ description = {"一旦触发 spill 后,aggregation sink 的 revocable memory
超过该阈值就主动落盘(字节)。默认 64MB。",
+ "After spill is triggered, aggregation sink will proactively
spill when revocable memory "
+ + "exceeds this threshold (in bytes). Default is 64GB."})
+ public long spillAggregationSinkMemLimitBytes = 64L * 1024L * 1024L *
1024L;
Review Comment:
**Bug (Critical):** Default value is `64L * 1024L * 1024L * 1024L` =
**64GB**, not 64MB.
All sibling variables use 64MB:
- `spillJoinBuildSinkMemLimitBytes = 64L * 1024L * 1024L` (64MB)
- `spillSortSinkMemLimitBytes = 64L * 1024L * 1024L` (64MB)
- `spillSortMergeMemLimitBytes = 64L * 1024L * 1024L` (64MB)
The thrift default is also 64MB (`67108864`). The Chinese description says
"默认 64MB" but the English says "Default is 64GB".
The BE accessor clamps to [1MB, 4GB], so the effective value becomes 4GB —
**62x higher** than the intended 64MB. This effectively disables proactive
spilling for aggregation sinks, causing excessive memory usage or OOM in
aggregation spill scenarios.
Fix:
```java
public long spillAggregationSinkMemLimitBytes = 64L * 1024L * 1024L;
```
Also fix the English description to say "Default is 64MB."
##########
be/src/exec/operator/partitioned_hash_join_probe_operator.cpp:
##########
@@ -743,120 +829,174 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block
bool PartitionedHashJoinProbeOperatorX::need_more_input_data(RuntimeState*
state) const {
auto& local_state = get_local_state(state);
- if (local_state._shared_state->is_spilled) {
+ if (local_state._shared_state->_is_spilled) {
return !local_state._child_eos;
- } else if (local_state._shared_state->inner_runtime_state) {
+ } else if (local_state._shared_state->_inner_runtime_state) {
return _inner_probe_operator->need_more_input_data(
- local_state._shared_state->inner_runtime_state.get());
+ local_state._shared_state->_inner_runtime_state.get());
} else {
return true;
}
}
+// Report only this operator's own revocable memory. The pipeline task
+// iterates all operators to sum revocable sizes and revoke each individually.
+// Sum up memory used by in-memory probe blocks and any partially-recovered
build block for the current partition.
+// This is the memory that can be freed if we choose to revoke and repartition
the current
size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState*
state) const {
auto& local_state = get_local_state(state);
- if (local_state._child_eos) {
+ if (!local_state._shared_state->_is_spilled) {
return 0;
}
- auto revocable_size = _revocable_mem_size(state, true);
- if (_child) {
- revocable_size += _child->revocable_mem_size(state);
- }
- return revocable_size;
-}
-
-size_t PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState*
state,
- bool force)
const {
- const auto spill_size_threshold =
- force ? SpillStream::MIN_SPILL_WRITE_BATCH_MEM :
SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
- auto& local_state = get_local_state(state);
size_t mem_size = 0;
- auto& probe_blocks = local_state._probe_blocks;
- for (uint32_t i = 0; i < _partition_count; ++i) {
- for (auto& block : probe_blocks[i]) {
- mem_size += block.allocated_bytes();
- }
-
- auto& partitioned_block = local_state._partitioned_blocks[i];
- if (partitioned_block) {
- auto block_bytes = partitioned_block->allocated_bytes();
- if (block_bytes >= spill_size_threshold) {
- mem_size += block_bytes;
+ if (!local_state._child_eos) {
+ for (uint32_t i = 0; i < _partition_count; ++i) {
+ auto& partitioned_block = local_state._partitioned_blocks[i];
+ if (!partitioned_block) {
+ continue;
+ }
+ const auto bytes = partitioned_block->allocated_bytes();
+ if (bytes >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
+ mem_size += bytes;
}
}
+ return mem_size > state->spill_min_revocable_mem() ? mem_size : 0;
+ }
+ if (!local_state._current_partition.is_valid() ||
+ local_state._current_partition.build_finished) {
+ // No active partition — no revocable memory.
+ // Or if current partition has finished build hash table.
+ return 0;
+ }
+
+ // Include build-side memory that has been recovered but not yet consumed
by the hash table.
+ // This data is revocable because we can repartition instead of building
the hash table.
+ if (local_state._recovered_build_block) {
+ mem_size += local_state._recovered_build_block->allocated_bytes();
}
- return mem_size;
+
+ return mem_size > state->spill_min_revocable_mem() ? mem_size : 0;
}
size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState*
state) {
auto& local_state = get_local_state(state);
- const auto is_spilled = local_state._shared_state->is_spilled;
- if (!is_spilled || local_state._child_eos) {
+ const bool is_spilled = local_state._shared_state->_is_spilled;
+
+ // Non-spill path: delegate to the inner probe operator / base class.
+ if (!is_spilled) {
return Base::get_reserve_mem_size(state);
}
- size_t size_to_reserve = SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
+ // Spill path, probe data still flowing in (child not yet EOS):
+ // We only need room for incoming probe blocks being partitioned. Reserve
+ // one batch worth of spill-write memory; no hash table will be built yet.
+ if (!local_state._child_eos) {
+ return state->minimum_operator_memory_required_bytes();
+ }
+
+ // Spill path, child EOS — we are in the recovery / build / probe phase.
+ // Baseline reservation is one block of spill I/O.
+ size_t size_to_reserve = state->minimum_operator_memory_required_bytes();
- if (local_state._need_to_setup_internal_operators) {
- const size_t rows =
- (local_state._recovered_build_block ?
local_state._recovered_build_block->rows()
- : 0) +
- state->batch_size();
- size_t bucket_size = hash_join_table_calc_bucket_size(rows);
+ const bool about_to_build = local_state._current_partition.is_valid() &&
+ !local_state._current_partition.build_finished;
+ if (about_to_build) {
+ // Estimate rows that will land in the hash table so we can reserve
+ // enough for JoinHashTable::first[] + JoinHashTable::next[].
+ size_t rows = std::max(static_cast<size_t>(state->batch_size()),
+
static_cast<size_t>(local_state._recovered_build_block->rows()));
Review Comment:
**Bug (Edge case):** `_recovered_build_block` can be `nullptr` when
`about_to_build` is true, causing a null dereference here.
Scenario:
1. At line 749, `_recovered_build_block` is reset when a new partition is
popped
2. `recover_build_blocks_from_partition` is called (line 763) and reads the
build file
3. If ALL blocks from the file happen to be empty (line 359 `continue`), the
function reaches EOS without ever creating `_recovered_build_block` (lines
366-371 never execute)
4. `build_file` is reset (line 379), function returns OK
5. Back in `_pull_from_spill_queue`, the function returns OK to the pipeline
task
6. In the NEXT pipeline iteration, `get_reserve_mem_size` is called. At this
point: `is_valid()=true`, `build_finished=false` → `about_to_build=true`, but
`_recovered_build_block` is null
7. Line 909: `_recovered_build_block->rows()` → **null dereference**
While an all-empty-blocks spill file is an edge case, the code should be
defensively correct. The simplest fix:
```cpp
if (about_to_build && local_state._recovered_build_block) {
```
Or add a null check:
```cpp
size_t rows = local_state._recovered_build_block
? std::max(static_cast<size_t>(state->batch_size()),
static_cast<size_t>(local_state._recovered_build_block->rows()))
: static_cast<size_t>(state->batch_size());
```
##########
be/src/exec/operator/partitioned_aggregation_source_operator.cpp:
##########
@@ -135,79 +186,212 @@ bool
PartitionedAggSourceOperatorX::is_shuffled_operator() const {
return _agg_source_operator->is_shuffled_operator();
}
+size_t PartitionedAggSourceOperatorX::revocable_mem_size(RuntimeState* state)
const {
+ auto& local_state = get_local_state(state);
+ if (!local_state._shared_state->_is_spilled ||
!local_state._current_partition.spill_file) {
+ return 0;
+ }
+
+ size_t bytes = 0;
+ for (const auto& block : local_state._blocks) {
+ bytes += block.allocated_bytes();
+ }
+ if (local_state._shared_state->_in_mem_shared_state != nullptr &&
+ local_state._shared_state->_in_mem_shared_state->agg_data != nullptr) {
+ auto* agg_data =
local_state._shared_state->_in_mem_shared_state->agg_data.get();
+ bytes += std::visit(Overload {[&](std::monostate& arg) -> size_t {
return 0; },
+ [&](auto& agg_method) -> size_t {
+ return
agg_method.hash_table->get_buffer_size_in_bytes();
+ }},
+ agg_data->method_variant);
+
+ if (auto& aggregate_data_container =
+
local_state._shared_state->_in_mem_shared_state->aggregate_data_container;
+ aggregate_data_container) {
+ bytes += aggregate_data_container->memory_usage();
+ }
+ }
+ return bytes > state->spill_min_revocable_mem() ? bytes : 0;
+}
+
+Status PartitionedAggSourceOperatorX::revoke_memory(RuntimeState* state) {
+ auto& local_state = get_local_state(state);
+ if (!local_state._shared_state->_is_spilled) {
+ return Status::OK();
+ }
+ VLOG_DEBUG << fmt::format("Query:{}, agg source:{}, task:{},
revoke_memory, hash table size:{}",
+ print_id(state->query_id()), node_id(),
state->task_id(),
+
PrettyPrinter::print_bytes(local_state._estimate_memory_usage));
+
+ // Flush hash table + repartition remaining spill files of the current
partition.
+ RETURN_IF_ERROR(local_state._flush_and_repartition(state));
+ local_state._current_partition = AggSpillPartitionInfo {};
+ local_state._need_to_setup_partition = true;
+ return Status::OK();
+}
+
Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, Block*
block, bool* eos) {
auto& local_state = get_local_state(state);
- local_state.copy_shared_spill_profile();
Status status;
- Defer defer {[&]() {
- if (!status.ok() || *eos) {
- local_state._shared_state->close();
- }
- }};
SCOPED_TIMER(local_state.exec_time_counter());
- if (local_state._shared_state->is_spilled &&
- local_state._need_to_merge_data_for_current_partition) {
- if (local_state._blocks.empty() &&
!local_state._current_partition_eos) {
- bool has_recovering_data = false;
- status = local_state.recover_blocks_from_disk(state,
has_recovering_data);
- RETURN_IF_ERROR(status);
- *eos = !has_recovering_data;
+ // ── Fast path: not spilled ─────────────────────────────────────────
+ if (!local_state._shared_state->_is_spilled) {
+ auto* runtime_state = local_state._runtime_state.get();
+
local_state._shared_state->_in_mem_shared_state->aggregate_data_container->init_once();
+ status = _agg_source_operator->get_block(runtime_state, block, eos);
+ RETURN_IF_ERROR(status);
+ if (*eos) {
+ auto* source_local_state =
+
runtime_state->get_local_state(_agg_source_operator->operator_id());
+
local_state._update_profile<false>(source_local_state->custom_profile());
+ }
+ local_state.reached_limit(block, eos);
+ return Status::OK();
+ }
+
+ // ── Spilled path ───────────────────────────────────────────────────
+ // One-time: move original spill_partitions from shared state into unified
queue.
+ if (local_state._partition_queue.empty() &&
local_state._need_to_setup_partition &&
+ !local_state._shared_state->_spill_partitions.empty()) {
+ local_state._init_partition_queue();
+ }
+
+ // Phase 1: Pop next partition from queue if needed.
+ if (local_state._need_to_setup_partition) {
+ if (local_state._partition_queue.empty()) {
+ *eos = true;
return Status::OK();
- } else if (!local_state._blocks.empty()) {
- size_t merged_rows = 0;
- while (!local_state._blocks.empty()) {
- auto block_ = std::move(local_state._blocks.front());
- merged_rows += block_.rows();
- local_state._blocks.erase(local_state._blocks.begin());
- status =
_agg_source_operator->merge_with_serialized_key_helper(
- local_state._runtime_state.get(), &block_);
- RETURN_IF_ERROR(status);
- }
- local_state._estimate_memory_usage +=
-
_agg_source_operator->get_estimated_memory_size_for_merging(
- local_state._runtime_state.get(), merged_rows);
+ }
- if (!local_state._current_partition_eos) {
- return Status::OK();
- }
+ local_state._current_partition =
std::move(local_state._partition_queue.front());
+ local_state._partition_queue.pop_front();
+ local_state._blocks.clear();
+ local_state._estimate_memory_usage = 0;
+
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, agg source:{}, task:{}, setup partition level:{}, "
+ "queue remaining:{}",
+ print_id(state->query_id()), node_id(), state->task_id(),
+ local_state._current_partition.level,
local_state._partition_queue.size());
+ local_state._need_to_setup_partition = false;
+ }
+
+ // Phase 2: Recover blocks from disk into _blocks (batch of ~8MB).
+ if (local_state._blocks.empty() &&
local_state._current_partition.spill_file) {
+ RETURN_IF_ERROR(
+ local_state._recover_blocks_from_partition(state,
local_state._current_partition));
+ // Return empty block to yield to pipeline scheduler.
+ // Pipeline task will check memory and call revoke_memory if needed.
+ *eos = false;
+ return Status::OK();
+ }
+
+ // Phase 3: Merge recovered blocks into hash table.
+ if (!local_state._blocks.empty()) {
+ size_t merged_rows = 0;
+ while (!local_state._blocks.empty()) {
+ auto blk = std::move(local_state._blocks.front());
+ merged_rows += blk.rows();
+ local_state._blocks.erase(local_state._blocks.begin());
Review Comment:
**Minor (Performance):** `_blocks.erase(_blocks.begin())` on a `std::vector`
is O(n) per call (shifts all elements), making this loop O(n²) overall.
With typical block sizes and 8MB buffer limits, n is usually small (single
digits), so the practical impact is minimal. Still, a cleaner pattern would be
index-based iteration:
```cpp
for (size_t i = 0; i < local_state._blocks.size(); ++i) {
auto blk = std::move(local_state._blocks[i]);
merged_rows += blk.rows();
status = _agg_source_operator->merge_with_serialized_key_helper(...);
RETURN_IF_ERROR(status);
}
local_state._blocks.clear();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]