jacktengg commented on code in PR #61212:
URL: https://github.com/apache/doris/pull/61212#discussion_r2937759800


##########
be/src/exec/operator/partitioned_aggregation_sink_operator.cpp:
##########
@@ -325,149 +352,93 @@ Status 
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
                                                        HashTableType& 
hash_table,
                                                        const size_t 
size_to_revoke, bool eos) {
     Status status;
-    Defer defer {[&]() {
-        if (!status.ok()) {
-            Base::_shared_state->close();
-        }
-    }};
 
     context.init_iterator();
+    auto& parent = _parent->template cast<PartitionedAggSinkOperatorX>();
 
-    
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
+    
Base::_shared_state->_in_mem_shared_state->aggregate_data_container->init_once();
 
-    const auto total_rows =
-            
Base::_shared_state->in_mem_shared_state->aggregate_data_container->total_count();
+    const auto total_rows = 
parent._agg_sink_operator->get_hash_table_size(_runtime_state.get());
+
+    if (total_rows == 0) {
+        return Status::OK();
+    }
 
     const size_t size_to_revoke_ = std::max<size_t>(size_to_revoke, 1);
 
     // `spill_batch_rows` will be between 4k and 1M
     // and each block to spill will not be larger than 
32MB(`MAX_SPILL_WRITE_BATCH_MEM`)
+    // TODO: yiguolei, should review this logic
     const auto spill_batch_rows = std::min<size_t>(
-            1024 * 1024, std::max<size_t>(4096, 
SpillStream::MAX_SPILL_WRITE_BATCH_MEM *
-                                                        total_rows / 
size_to_revoke_));
+
+            1024 * 1024, std::max<size_t>(4096, 
SpillFile::MAX_SPILL_WRITE_BATCH_MEM * total_rows /
+                                                        size_to_revoke_));
 
     VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
-               << ", spill_batch_rows: " << spill_batch_rows << ", total rows: 
" << total_rows;
+               << ", spill_batch_rows: " << spill_batch_rows << ", total rows: 
" << total_rows
+               << ", size_to_revoke: " << size_to_revoke;
     size_t row_count = 0;
 
     std::vector<TmpSpillInfo<typename HashTableType::key_type>> spill_infos(
-            Base::_shared_state->partition_count);
-    auto& iter = 
Base::_shared_state->in_mem_shared_state->aggregate_data_container->iterator;
-    while (iter != 
Base::_shared_state->in_mem_shared_state->aggregate_data_container->end() &&
+            parent._partition_count);
+    auto& iter = 
Base::_shared_state->_in_mem_shared_state->aggregate_data_container->iterator;
+    while (iter != 
Base::_shared_state->_in_mem_shared_state->aggregate_data_container->end() &&
            !state->is_cancelled()) {
         const auto& key = iter.template get_key<typename 
HashTableType::key_type>();
-        auto partition_index = 
Base::_shared_state->get_partition_index(hash_table.hash(key));
+        auto partition_index = hash_table.hash(key) % parent._partition_count;

Review Comment:
   What if parent._partition_count == 0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to