This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 95f9b32c7fb [Bug](fix) Fix topn agg limit may get error result in when refresh heap (#47844) (#47903) 95f9b32c7fb is described below commit 95f9b32c7fba16dcafb413e180f4661ad721218e Author: HappenLee <happen...@selectdb.com> AuthorDate: Fri Feb 14 15:21:44 2025 +0800 [Bug](fix) Fix topn agg limit may get error result in when refresh heap (#47844) (#47903) cherry pick #47844 --- be/src/pipeline/dependency.cpp | 12 +++ be/src/pipeline/dependency.h | 3 + be/src/pipeline/exec/aggregation_sink_operator.cpp | 24 +----- .../pipeline/operator/agg_shared_state_test.cpp | 95 ++++++++++++++++++++++ 4 files changed, 114 insertions(+), 20 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 93117fa71a0..ffba01b05b2 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -399,6 +399,18 @@ Status AggSharedState::_destroy_agg_status(vectorized::AggregateDataPtr data) { return Status::OK(); } +void AggSharedState::refresh_top_limit(size_t row_id, + const vectorized::ColumnRawPtrs& key_columns) { + for (int j = 0; j < key_columns.size(); ++j) { + limit_columns[j]->insert_from(*key_columns[j], row_id); + } + limit_heap.emplace(limit_columns[0]->size() - 1, limit_columns, order_directions, + null_directions); + + limit_heap.pop(); + limit_columns_min = limit_heap.top()._row_id; +} + LocalExchangeSharedState::~LocalExchangeSharedState() = default; } // namespace doris::pipeline diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index fea6d9cb7bb..dd953292396 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -386,6 +386,9 @@ public: std::priority_queue<HeapLimitCursor> limit_heap; + // Refresh the top limit heap with a new row + void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs& key_columns); + private: vectorized::MutableColumns _get_keys_hash_table(); diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index d253685c4ae..39dfb4e7f3c 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -597,23 +597,7 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData agg_method.init_serialized_keys(key_columns, num_rows); size_t i = 0; - auto refresh_top_limit = [&, this]() { - _shared_state->limit_heap.pop(); - for (int j = 0; j < key_columns.size(); ++j) { - _shared_state->limit_columns[j]->insert_from(*key_columns[j], - i); - } - _shared_state->limit_heap.emplace( - _shared_state->limit_columns[0]->size() - 1, - _shared_state->limit_columns, - _shared_state->order_directions, - _shared_state->null_directions); - _shared_state->limit_columns_min = - _shared_state->limit_heap.top()._row_id; - }; - - auto creator = [this, refresh_top_limit](const auto& ctor, auto& key, - auto& origin) { + auto creator = [&](const auto& ctor, auto& key, auto& origin) { try { HashMethodType::try_presis_key_and_origin(key, origin, *_agg_arena_pool); @@ -625,7 +609,7 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData throw Exception(st.code(), st.to_string()); } ctor(key, mapped); - refresh_top_limit(); + _shared_state->refresh_top_limit(i, key_columns); } catch (...) { // Exception-safety - if it can not allocate memory or create status, // the destructors will not be called. @@ -634,7 +618,7 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData } }; - auto creator_for_null_key = [this, refresh_top_limit](auto& mapped) { + auto creator_for_null_key = [&](auto& mapped) { mapped = _agg_arena_pool->aligned_alloc( Base::_parent->template cast<AggSinkOperatorX>() ._total_size_of_aggregate_states, @@ -644,7 +628,7 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData if (!st) { throw Exception(st.code(), st.to_string()); } - refresh_top_limit(); + _shared_state->refresh_top_limit(i, key_columns); }; SCOPED_TIMER(_hash_table_emplace_timer); diff --git a/be/test/pipeline/operator/agg_shared_state_test.cpp b/be/test/pipeline/operator/agg_shared_state_test.cpp new file mode 100644 index 00000000000..e4ce200ed1e --- /dev/null +++ b/be/test/pipeline/operator/agg_shared_state_test.cpp @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include "pipeline/dependency.h" +#include "vec/columns/column_vector.h" +#include "vec/data_types/data_type_number.h" + +namespace doris::pipeline { + +class AggSharedStateTest : public testing::Test { +protected: + void SetUp() override { + _shared_state = std::make_shared<AggSharedState>(); + + // Setup test data + auto int_type = std::make_shared<vectorized::DataTypeInt32>(); + _shared_state->limit_columns.push_back(int_type->create_column()); + + // Setup order directions (ascending) + _shared_state->order_directions = {1}; + _shared_state->null_directions = {1}; + + // Create test column + _test_column = int_type->create_column(); + auto* col_data = reinterpret_cast<vectorized::ColumnVector<int>*>(_test_column.get()); + + // Insert test values: 5, 3, 1, -2, -1, 0 + col_data->insert(5); + col_data->insert(3); + col_data->insert(1); + col_data->insert(-1); + col_data->insert(0); + col_data->insert(2); + + _key_columns.push_back(_test_column.get()); + // prepare the heap data first [5, 3, 1, -2] + for (int i = 0; i < 4; ++i) { + for (int j = 0; j < _key_columns.size(); ++j) { + _shared_state->limit_columns[j]->insert_from(*_key_columns[j], i); + } + // build agg limit heap + _shared_state->limit_heap.emplace( + _shared_state->limit_columns[0]->size() - 1, _shared_state->limit_columns, + _shared_state->order_directions, _shared_state->null_directions); + } + // keep the top limit values, only 3 value in heap [-1, 3, 1] + _shared_state->limit_heap.pop(); + _shared_state->limit_columns_min = _shared_state->limit_heap.top()._row_id; + } + + std::shared_ptr<AggSharedState> _shared_state; + vectorized::MutableColumnPtr _test_column; + vectorized::ColumnRawPtrs _key_columns; +}; + +TEST_F(AggSharedStateTest, TestRefreshTopLimit) { + // Test with limit = 3 (keep top 3 values) + _shared_state->limit = 3; + + // Add values one by one and verify the minimum value is tracked correctly + EXPECT_EQ(_shared_state->limit_columns_min, 1); + + _shared_state->refresh_top_limit(4, _key_columns); + EXPECT_EQ(_shared_state->limit_columns_min, 2); + + _shared_state->refresh_top_limit(5, _key_columns); + EXPECT_EQ(_shared_state->limit_columns_min, 2); // 1 should still be max + + auto heap_size = _shared_state->limit_heap.size(); + EXPECT_EQ(heap_size, 3); + + EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 2); // 1 should be the top value + _shared_state->limit_heap.pop(); + EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 4); // 0 should be the top value + _shared_state->limit_heap.pop(); + EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 3); // -1 should be the top value +} + +} // namespace doris::pipeline --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org