This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 95f9b32c7fb [Bug](fix) Fix topn agg limit may get error result in when 
refresh heap (#47844) (#47903)
95f9b32c7fb is described below

commit 95f9b32c7fba16dcafb413e180f4661ad721218e
Author: HappenLee <happen...@selectdb.com>
AuthorDate: Fri Feb 14 15:21:44 2025 +0800

    [Bug](fix) Fix topn agg limit may get error result in when refresh heap 
(#47844) (#47903)
    
    cherry pick #47844
---
 be/src/pipeline/dependency.cpp                     | 12 +++
 be/src/pipeline/dependency.h                       |  3 +
 be/src/pipeline/exec/aggregation_sink_operator.cpp | 24 +-----
 .../pipeline/operator/agg_shared_state_test.cpp    | 95 ++++++++++++++++++++++
 4 files changed, 114 insertions(+), 20 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 93117fa71a0..ffba01b05b2 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -399,6 +399,18 @@ Status 
AggSharedState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
     return Status::OK();
 }
 
+void AggSharedState::refresh_top_limit(size_t row_id,
+                                       const vectorized::ColumnRawPtrs& 
key_columns) {
+    for (int j = 0; j < key_columns.size(); ++j) {
+        limit_columns[j]->insert_from(*key_columns[j], row_id);
+    }
+    limit_heap.emplace(limit_columns[0]->size() - 1, limit_columns, 
order_directions,
+                       null_directions);
+
+    limit_heap.pop();
+    limit_columns_min = limit_heap.top()._row_id;
+}
+
 LocalExchangeSharedState::~LocalExchangeSharedState() = default;
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index fea6d9cb7bb..dd953292396 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -386,6 +386,9 @@ public:
 
     std::priority_queue<HeapLimitCursor> limit_heap;
 
+    // Refresh the top limit heap with a new row
+    void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs& 
key_columns);
+
 private:
     vectorized::MutableColumns _get_keys_hash_table();
 
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index d253685c4ae..39dfb4e7f3c 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -597,23 +597,7 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
                             agg_method.init_serialized_keys(key_columns, 
num_rows);
                             size_t i = 0;
 
-                            auto refresh_top_limit = [&, this]() {
-                                _shared_state->limit_heap.pop();
-                                for (int j = 0; j < key_columns.size(); ++j) {
-                                    
_shared_state->limit_columns[j]->insert_from(*key_columns[j],
-                                                                               
  i);
-                                }
-                                _shared_state->limit_heap.emplace(
-                                        
_shared_state->limit_columns[0]->size() - 1,
-                                        _shared_state->limit_columns,
-                                        _shared_state->order_directions,
-                                        _shared_state->null_directions);
-                                _shared_state->limit_columns_min =
-                                        
_shared_state->limit_heap.top()._row_id;
-                            };
-
-                            auto creator = [this, refresh_top_limit](const 
auto& ctor, auto& key,
-                                                                     auto& 
origin) {
+                            auto creator = [&](const auto& ctor, auto& key, 
auto& origin) {
                                 try {
                                     
HashMethodType::try_presis_key_and_origin(key, origin,
                                                                               
*_agg_arena_pool);
@@ -625,7 +609,7 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
                                         throw Exception(st.code(), 
st.to_string());
                                     }
                                     ctor(key, mapped);
-                                    refresh_top_limit();
+                                    _shared_state->refresh_top_limit(i, 
key_columns);
                                 } catch (...) {
                                     // Exception-safety - if it can not 
allocate memory or create status,
                                     // the destructors will not be called.
@@ -634,7 +618,7 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
                                 }
                             };
 
-                            auto creator_for_null_key = [this, 
refresh_top_limit](auto& mapped) {
+                            auto creator_for_null_key = [&](auto& mapped) {
                                 mapped = _agg_arena_pool->aligned_alloc(
                                         Base::_parent->template 
cast<AggSinkOperatorX>()
                                                 
._total_size_of_aggregate_states,
@@ -644,7 +628,7 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
                                 if (!st) {
                                     throw Exception(st.code(), st.to_string());
                                 }
-                                refresh_top_limit();
+                                _shared_state->refresh_top_limit(i, 
key_columns);
                             };
 
                             SCOPED_TIMER(_hash_table_emplace_timer);
diff --git a/be/test/pipeline/operator/agg_shared_state_test.cpp 
b/be/test/pipeline/operator/agg_shared_state_test.cpp
new file mode 100644
index 00000000000..e4ce200ed1e
--- /dev/null
+++ b/be/test/pipeline/operator/agg_shared_state_test.cpp
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "pipeline/dependency.h"
+#include "vec/columns/column_vector.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris::pipeline {
+
+class AggSharedStateTest : public testing::Test {
+protected:
+    void SetUp() override {
+        _shared_state = std::make_shared<AggSharedState>();
+
+        // Setup test data
+        auto int_type = std::make_shared<vectorized::DataTypeInt32>();
+        _shared_state->limit_columns.push_back(int_type->create_column());
+
+        // Setup order directions (ascending)
+        _shared_state->order_directions = {1};
+        _shared_state->null_directions = {1};
+
+        // Create test column
+        _test_column = int_type->create_column();
+        auto* col_data = 
reinterpret_cast<vectorized::ColumnVector<int>*>(_test_column.get());
+
+        // Insert test values: 5, 3, 1, -2, -1, 0
+        col_data->insert(5);
+        col_data->insert(3);
+        col_data->insert(1);
+        col_data->insert(-1);
+        col_data->insert(0);
+        col_data->insert(2);
+
+        _key_columns.push_back(_test_column.get());
+        // prepare the heap data first [5, 3, 1, -2]
+        for (int i = 0; i < 4; ++i) {
+            for (int j = 0; j < _key_columns.size(); ++j) {
+                _shared_state->limit_columns[j]->insert_from(*_key_columns[j], 
i);
+            }
+            // build agg limit heap
+            _shared_state->limit_heap.emplace(
+                    _shared_state->limit_columns[0]->size() - 1, 
_shared_state->limit_columns,
+                    _shared_state->order_directions, 
_shared_state->null_directions);
+        }
+        // keep the top limit values, only 3 value in heap [-1, 3, 1]
+        _shared_state->limit_heap.pop();
+        _shared_state->limit_columns_min = 
_shared_state->limit_heap.top()._row_id;
+    }
+
+    std::shared_ptr<AggSharedState> _shared_state;
+    vectorized::MutableColumnPtr _test_column;
+    vectorized::ColumnRawPtrs _key_columns;
+};
+
+TEST_F(AggSharedStateTest, TestRefreshTopLimit) {
+    // Test with limit = 3 (keep top 3 values)
+    _shared_state->limit = 3;
+
+    // Add values one by one and verify the minimum value is tracked correctly
+    EXPECT_EQ(_shared_state->limit_columns_min, 1);
+
+    _shared_state->refresh_top_limit(4, _key_columns);
+    EXPECT_EQ(_shared_state->limit_columns_min, 2);
+
+    _shared_state->refresh_top_limit(5, _key_columns);
+    EXPECT_EQ(_shared_state->limit_columns_min, 2); // 1 should still be max
+
+    auto heap_size = _shared_state->limit_heap.size();
+    EXPECT_EQ(heap_size, 3);
+
+    EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 2); // 1 should be the 
top value
+    _shared_state->limit_heap.pop();
+    EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 4); // 0 should be the 
top value
+    _shared_state->limit_heap.pop();
+    EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 3); // -1 should be the 
top value
+}
+
+} // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to