This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new d56371da6be [Feature](compatibility) make some agg function restrict (#41459) d56371da6be is described below commit d56371da6be952e37d8371da8ec683a56b9865fe Author: Pxl <pxl...@qq.com> AuthorDate: Thu Oct 10 14:52:19 2024 +0800 [Feature](compatibility) make some agg function restrict (#41459) ## Proposed changes make some agg function restrict --- be/src/agent/be_exec_version_manager.cpp | 26 +- be/src/agent/be_exec_version_manager.h | 9 +- .../aggregate_function_covar.cpp | 16 +- .../aggregate_functions/aggregate_function_covar.h | 34 --- .../aggregate_function_percentile.cpp | 62 +---- .../aggregate_function_percentile.h | 270 --------------------- .../aggregate_function_percentile_approx.cpp | 39 --- .../aggregate_function_percentile_approx.h | 258 -------------------- .../aggregate_function_stddev.cpp | 32 +-- .../aggregate_function_stddev.h | 34 --- .../aggregate_function_window_funnel.cpp | 28 +-- .../aggregate_function_window_funnel.h | 261 -------------------- .../aggregate_functions/vec_window_funnel_test.cpp | 3 +- .../main/java/org/apache/doris/common/Config.java | 2 +- 14 files changed, 40 insertions(+), 1034 deletions(-) diff --git a/be/src/agent/be_exec_version_manager.cpp b/be/src/agent/be_exec_version_manager.cpp index 0bdb55f7bc3..bfd0745e316 100644 --- a/be/src/agent/be_exec_version_manager.cpp +++ b/be/src/agent/be_exec_version_manager.cpp @@ -34,6 +34,13 @@ Status BeExecVersionManager::check_be_exec_version(int be_exec_version) { int BeExecVersionManager::get_function_compatibility(int be_exec_version, std::string function_name) { + if (_function_restrict_map.contains(function_name) && be_exec_version != get_newest_version()) { + throw Exception(Status::InternalError( + "function {} do not support old be exec version, maybe it's because doris are " + "doing a rolling upgrade. newest_version={}, input_be_exec_version={}", + function_name, get_newest_version(), be_exec_version)); + } + auto it = _function_change_map.find(function_name); if (it == _function_change_map.end()) { // 0 means no compatibility issues need to be dealt with @@ -82,7 +89,7 @@ void BeExecVersionManager::check_function_compatibility(int current_be_exec_vers * 3: start from doris 2.0.0 (by some mistakes) * a. aggregation function do not serialize bitmap to string. * b. support window funnel mode. - * 4/5: start from doris 2.1.0 + * 4: start from doris 2.1.0 * a. ignore this line, window funnel mode should be enabled from 2.0. * b. array contains/position/countequal function return nullable in less situations. * c. cleared old version of Version 2. @@ -92,15 +99,22 @@ void BeExecVersionManager::check_function_compatibility(int current_be_exec_vers * g. do local merge of remote runtime filter * h. "now": ALWAYS_NOT_NULLABLE -> DEPEND_ON_ARGUMENTS * - * 7: start from doris 3.0.0 + * 5: start from doris 3.0.0 + * a. change some agg function nullable property: PR #37215 + * + * 6: start from doris 3.0.1 and 2.1.6 * a. change the impl of percentile (need fix) * b. clear old version of version 3->4 * c. change FunctionIsIPAddressInRange from AlwaysNotNullable to DependOnArguments - * d. change some agg function nullable property: PR #37215 - * e. change variant serde to fix PR #38413 - * f. support const column in serialize/deserialize function: PR #41175 + * d. change variant serde to fix PR #38413 + * + * 7: start from doris 3.0.2 + * a. window funnel logic change +* b. support const column in serialize/deserialize function: PR #41175 */ -const int BeExecVersionManager::max_be_exec_version = 7; + +const int BeExecVersionManager::max_be_exec_version = 8; const int BeExecVersionManager::min_be_exec_version = 0; std::map<std::string, std::set<int>> BeExecVersionManager::_function_change_map {}; +std::set<std::string> BeExecVersionManager::_function_restrict_map; } // namespace doris diff --git a/be/src/agent/be_exec_version_manager.h b/be/src/agent/be_exec_version_manager.h index a51fb8e36b4..f4158a40152 100644 --- a/be/src/agent/be_exec_version_manager.h +++ b/be/src/agent/be_exec_version_manager.h @@ -25,7 +25,6 @@ namespace doris { -constexpr static int AGG_FUNCTION_NEW_WINDOW_FUNNEL = 6; constexpr inline int BITMAP_SERDE = 3; constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1 constexpr inline int OLD_WAL_SERDE = 3; // use to solve compatibility issues, see pr #32299 @@ -34,7 +33,7 @@ constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix P constexpr inline int AGGREGATION_2_1_VERSION = 6; // some aggregation changed the data format after this version constexpr inline int USE_CONST_SERDE = - 7; // support const column in serialize/deserialize function: PR #41175 + 8; // support const column in serialize/deserialize function: PR #41175 class BeExecVersionManager { public: @@ -59,11 +58,17 @@ public: _function_change_map[function_name].insert(breaking_old_version); } + static void registe_restrict_function_compatibility(std::string function_name) { + _function_restrict_map.insert(function_name); + } + private: static const int max_be_exec_version; static const int min_be_exec_version; // [function name] -> [breaking change start version] static std::map<std::string, std::set<int>> _function_change_map; + // those function must has input newest be exec version + static std::set<std::string> _function_restrict_map; }; } // namespace doris diff --git a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp index 76a2881dd78..b02d6ae0e12 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp @@ -51,15 +51,6 @@ AggregateFunctionPtr create_function_single_value(const String& name, return nullptr; } -template <bool is_nullable> -AggregateFunctionPtr create_aggregate_function_covariance_samp_old(const std::string& name, - const DataTypes& argument_types, - const bool result_is_nullable) { - return create_function_single_value<AggregateFunctionSamp_OLDER, CovarSampName, SampData_OLDER, - is_nullable>(name, argument_types, result_is_nullable, - NULLABLE); -} - AggregateFunctionPtr create_aggregate_function_covariance_samp(const std::string& name, const DataTypes& argument_types, const bool result_is_nullable) { @@ -80,12 +71,7 @@ void register_aggregate_function_covar_pop(AggregateFunctionSimpleFactory& facto } void register_aggregate_function_covar_samp_old(AggregateFunctionSimpleFactory& factory) { - factory.register_alternative_function( - "covar_samp", create_aggregate_function_covariance_samp_old<NOTNULLABLE>, false, - AGG_FUNCTION_NULLABLE); - factory.register_alternative_function("covar_samp", - create_aggregate_function_covariance_samp_old<NULLABLE>, - true, AGG_FUNCTION_NULLABLE); + BeExecVersionManager::registe_restrict_function_compatibility("covar_samp"); } void register_aggregate_function_covar_samp(AggregateFunctionSimpleFactory& factory) { diff --git a/be/src/vec/aggregate_functions/aggregate_function_covar.h b/be/src/vec/aggregate_functions/aggregate_function_covar.h index 78a3eae5bcb..e6ebec70285 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_covar.h +++ b/be/src/vec/aggregate_functions/aggregate_function_covar.h @@ -137,32 +137,6 @@ struct PopData : Data { static DataTypePtr get_return_type() { return std::make_shared<DataTypeNumber<Float64>>(); } }; -template <typename T, typename Data> -struct SampData_OLDER : Data { - void insert_result_into(IColumn& to) const { - if (to.is_nullable()) { - ColumnNullable& nullable_column = assert_cast<ColumnNullable&>(to); - if (this->count == 1 || this->count == 0) { - nullable_column.insert_default(); - } else { - auto& col = assert_cast<ColumnFloat64&>(nullable_column.get_nested_column()); - col.get_data().push_back(this->get_samp_result()); - nullable_column.get_null_map_data().push_back(0); - } - } else { - auto& col = assert_cast<ColumnFloat64&>(to); - if (this->count == 1 || this->count == 0) { - col.insert_default(); - } else { - col.get_data().push_back(this->get_samp_result()); - } - } - } - static DataTypePtr get_return_type() { - return make_nullable(std::make_shared<DataTypeNumber<Float64>>()); - } -}; - template <typename T, typename Data> struct SampData : Data { void insert_result_into(IColumn& to) const { @@ -258,14 +232,6 @@ public: } }; -template <typename Data, bool is_nullable> -class AggregateFunctionSamp_OLDER final - : public AggregateFunctionSampCovariance<NOTPOP, Data, is_nullable> { -public: - AggregateFunctionSamp_OLDER(const DataTypes& argument_types_) - : AggregateFunctionSampCovariance<NOTPOP, Data, is_nullable>(argument_types_) {} -}; - template <typename Data, bool is_nullable> class AggregateFunctionSamp final : public AggregateFunctionSampCovariance<NOTPOP, Data, is_nullable> { diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp b/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp index ac8e40d0312..248d808f2cc 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp @@ -23,27 +23,6 @@ namespace doris::vectorized { -template <bool is_nullable> -AggregateFunctionPtr create_aggregate_function_percentile_approx_older( - const std::string& name, const DataTypes& argument_types, const bool result_is_nullable) { - const DataTypePtr& argument_type = remove_nullable(argument_types[0]); - WhichDataType which(argument_type); - if (which.idx != TypeIndex::Float64) { - return nullptr; - } - if (argument_types.size() == 2) { - return creator_without_type::create< - AggregateFunctionPercentileApproxTwoParams_OLDER<is_nullable>>( - remove_nullable(argument_types), result_is_nullable); - } - if (argument_types.size() == 3) { - return creator_without_type::create< - AggregateFunctionPercentileApproxThreeParams_OLDER<is_nullable>>( - remove_nullable(argument_types), result_is_nullable); - } - return nullptr; -} - AggregateFunctionPtr create_aggregate_function_percentile_approx(const std::string& name, const DataTypes& argument_types, const bool result_is_nullable) { @@ -63,27 +42,6 @@ AggregateFunctionPtr create_aggregate_function_percentile_approx(const std::stri return nullptr; } -template <bool is_nullable> -AggregateFunctionPtr create_aggregate_function_percentile_approx_weighted_older( - const std::string& name, const DataTypes& argument_types, const bool result_is_nullable) { - const DataTypePtr& argument_type = remove_nullable(argument_types[0]); - WhichDataType which(argument_type); - if (which.idx != TypeIndex::Float64) { - return nullptr; - } - if (argument_types.size() == 3) { - return creator_without_type::create< - AggregateFunctionPercentileApproxWeightedThreeParams_OLDER<is_nullable>>( - remove_nullable(argument_types), result_is_nullable); - } - if (argument_types.size() == 4) { - return creator_without_type::create< - AggregateFunctionPercentileApproxWeightedFourParams_OLDER<is_nullable>>( - remove_nullable(argument_types), result_is_nullable); - } - return nullptr; -} - AggregateFunctionPtr create_aggregate_function_percentile_approx_weighted( const std::string& name, const DataTypes& argument_types, const bool result_is_nullable) { const DataTypePtr& argument_type = remove_nullable(argument_types[0]); @@ -111,20 +69,12 @@ void register_aggregate_function_percentile(AggregateFunctionSimpleFactory& fact } void register_percentile_approx_old_function(AggregateFunctionSimpleFactory& factory) { - factory.register_alternative_function("percentile_approx", - create_aggregate_function_percentile_approx_older<false>, - false, AGG_FUNCTION_NULLABLE); - factory.register_alternative_function("percentile_approx", - create_aggregate_function_percentile_approx_older<true>, - true, AGG_FUNCTION_NULLABLE); - factory.register_alternative_function( - "percentile_approx_weighted", - create_aggregate_function_percentile_approx_weighted_older<false>, false, - AGG_FUNCTION_NULLABLE); - factory.register_alternative_function( - "percentile_approx_weighted", - create_aggregate_function_percentile_approx_weighted_older<true>, true, - AGG_FUNCTION_NULLABLE); + BeExecVersionManager::registe_restrict_function_compatibility("percentile_approx"); + BeExecVersionManager::registe_restrict_function_compatibility("percentile_approx_weighted"); +} + +void register_aggregate_function_percentile_old(AggregateFunctionSimpleFactory& factory) { + BeExecVersionManager::registe_restrict_function_compatibility("percentile"); } void register_aggregate_function_percentile_approx(AggregateFunctionSimpleFactory& factory) { diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile.h b/be/src/vec/aggregate_functions/aggregate_function_percentile.h index 0cec238846e..a1e739d8758 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_percentile.h +++ b/be/src/vec/aggregate_functions/aggregate_function_percentile.h @@ -182,68 +182,6 @@ public: } }; -template <bool is_nullable> -class AggregateFunctionPercentileApproxTwoParams_OLDER : public AggregateFunctionPercentileApprox { -public: - AggregateFunctionPercentileApproxTwoParams_OLDER(const DataTypes& argument_types_) - : AggregateFunctionPercentileApprox(argument_types_) {} - void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, - Arena*) const override { - if constexpr (is_nullable) { - double column_data[2] = {0, 0}; - - for (int i = 0; i < 2; ++i) { - const auto* nullable_column = check_and_get_column<ColumnNullable>(columns[i]); - if (nullable_column == nullptr) { //Not Nullable column - const auto& column = - assert_cast<const ColumnFloat64&, TypeCheckOnRelease::DISABLE>( - *columns[i]); - column_data[i] = column.get_element(row_num); - } else if (!nullable_column->is_null_at( - row_num)) { // Nullable column && Not null data - const auto& column = - assert_cast<const ColumnFloat64&, TypeCheckOnRelease::DISABLE>( - nullable_column->get_nested_column()); - column_data[i] = column.get_element(row_num); - } else { // Nullable column && null data - if (i == 0) { - return; - } - } - } - - this->data(place).init(); - this->data(place).add(column_data[0], column_data[1]); - - } else { - const auto& sources = - assert_cast<const ColumnFloat64&, TypeCheckOnRelease::DISABLE>(*columns[0]); - const auto& quantile = - assert_cast<const ColumnFloat64&, TypeCheckOnRelease::DISABLE>(*columns[1]); - - this->data(place).init(); - this->data(place).add(sources.get_element(row_num), quantile.get_element(row_num)); - } - } - - DataTypePtr get_return_type() const override { - return make_nullable(std::make_shared<DataTypeFloat64>()); - } - - void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { - auto& nullable_column = assert_cast<ColumnNullable&>(to); - double result = AggregateFunctionPercentileApprox::data(place).get(); - - if (std::isnan(result)) { - nullable_column.insert_default(); - } else { - auto& col = assert_cast<ColumnFloat64&>(nullable_column.get_nested_column()); - col.get_data().push_back(result); - nullable_column.get_null_map_data().push_back(0); - } - } -}; - class AggregateFunctionPercentileApproxTwoParams : public AggregateFunctionPercentileApprox { public: AggregateFunctionPercentileApproxTwoParams(const DataTypes& argument_types_) @@ -272,71 +210,6 @@ public: } }; -template <bool is_nullable> -class AggregateFunctionPercentileApproxThreeParams_OLDER - : public AggregateFunctionPercentileApprox { -public: - AggregateFunctionPercentileApproxThreeParams_OLDER(const DataTypes& argument_types_) - : AggregateFunctionPercentileApprox(argument_types_) {} - void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, - Arena*) const override { - if constexpr (is_nullable) { - double column_data[3] = {0, 0, 0}; - - for (int i = 0; i < 3; ++i) { - const auto* nullable_column = check_and_get_column<ColumnNullable>(columns[i]); - if (nullable_column == nullptr) { //Not Nullable column - const auto& column = - assert_cast<const ColumnFloat64&, TypeCheckOnRelease::DISABLE>( - *columns[i]); - column_data[i] = column.get_element(row_num); - } else if (!nullable_column->is_null_at( - row_num)) { // Nullable column && Not null data - const auto& column = - assert_cast<const ColumnFloat64&, TypeCheckOnRelease::DISABLE>( - nullable_column->get_nested_column()); - column_data[i] = column.get_element(row_num); - } else { // Nullable column && null data - if (i == 0) { - return; - } - } - } - - this->data(place).init(column_data[2]); - this->data(place).add(column_data[0], column_data[1]); - - } else { - const auto& sources = - assert_cast<const ColumnFloat64&, TypeCheckOnRelease::DISABLE>(*columns[0]); - const auto& quantile = - assert_cast<const ColumnFloat64&, TypeCheckOnRelease::DISABLE>(*columns[1]); - const auto& compression = - assert_cast<const ColumnFloat64&, TypeCheckOnRelease::DISABLE>(*columns[2]); - - this->data(place).init(compression.get_element(row_num)); - this->data(place).add(sources.get_element(row_num), quantile.get_element(row_num)); - } - } - - DataTypePtr get_return_type() const override { - return make_nullable(std::make_shared<DataTypeFloat64>()); - } - - void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { - auto& nullable_column = assert_cast<ColumnNullable&>(to); - double result = AggregateFunctionPercentileApprox::data(place).get(); - - if (std::isnan(result)) { - nullable_column.insert_default(); - } else { - auto& col = assert_cast<ColumnFloat64&>(nullable_column.get_nested_column()); - col.get_data().push_back(result); - nullable_column.get_null_map_data().push_back(0); - } - } -}; - class AggregateFunctionPercentileApproxThreeParams : public AggregateFunctionPercentileApprox { public: AggregateFunctionPercentileApproxThreeParams(const DataTypes& argument_types_) @@ -368,76 +241,6 @@ public: } }; -template <bool is_nullable> -class AggregateFunctionPercentileApproxWeightedThreeParams_OLDER - : public AggregateFunctionPercentileApprox { -public: - AggregateFunctionPercentileApproxWeightedThreeParams_OLDER(const DataTypes& argument_types_) - : AggregateFunctionPercentileApprox(argument_types_) {} - - void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, - Arena*) const override { - if constexpr (is_nullable) { - // sources quantile weight - double column_data[3] = {0, 0, 0}; - for (int i = 0; i < 3; ++i) { - const auto* nullable_column = check_and_get_column<ColumnNullable>(columns[i]); - if (nullable_column == nullptr) { //Not Nullable column - const auto& column = - assert_cast<const ColumnVector<Float64>&, TypeCheckOnRelease::DISABLE>( - *columns[i]); - column_data[i] = column.get_element(row_num); - } else if (!nullable_column->is_null_at( - row_num)) { // Nullable column && Not null data - const auto& column = - assert_cast<const ColumnVector<Float64>&, TypeCheckOnRelease::DISABLE>( - nullable_column->get_nested_column()); - column_data[i] = column.get_element(row_num); - } else { // Nullable column && null data - if (i == 0) { - return; - } - } - } - this->data(place).init(); - this->data(place).add_with_weight(column_data[0], column_data[1], column_data[2]); - - } else { - const auto& sources = - assert_cast<const ColumnVector<Float64>&, TypeCheckOnRelease::DISABLE>( - *columns[0]); - const auto& weight = - assert_cast<const ColumnVector<Float64>&, TypeCheckOnRelease::DISABLE>( - *columns[1]); - const auto& quantile = - assert_cast<const ColumnVector<Float64>&, TypeCheckOnRelease::DISABLE>( - *columns[2]); - - this->data(place).init(); - this->data(place).add_with_weight(sources.get_element(row_num), - weight.get_element(row_num), - quantile.get_element(row_num)); - } - } - - DataTypePtr get_return_type() const override { - return make_nullable(std::make_shared<DataTypeFloat64>()); - } - - void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { - auto& nullable_column = assert_cast<ColumnNullable&>(to); - double result = AggregateFunctionPercentileApprox::data(place).get(); - - if (std::isnan(result)) { - nullable_column.insert_default(); - } else { - auto& col = assert_cast<ColumnFloat64&>(nullable_column.get_nested_column()); - col.get_data().push_back(result); - nullable_column.get_null_map_data().push_back(0); - } - } -}; - class AggregateFunctionPercentileApproxWeightedThreeParams : public AggregateFunctionPercentileApprox { public: @@ -472,79 +275,6 @@ public: } }; -template <bool is_nullable> -class AggregateFunctionPercentileApproxWeightedFourParams_OLDER - : public AggregateFunctionPercentileApprox { -public: - AggregateFunctionPercentileApproxWeightedFourParams_OLDER(const DataTypes& argument_types_) - : AggregateFunctionPercentileApprox(argument_types_) {} - void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, - Arena*) const override { - if constexpr (is_nullable) { - double column_data[4] = {0, 0, 0, 0}; - - for (int i = 0; i < 4; ++i) { - const auto* nullable_column = check_and_get_column<ColumnNullable>(columns[i]); - if (nullable_column == nullptr) { //Not Nullable column - const auto& column = - assert_cast<const ColumnVector<Float64>&, TypeCheckOnRelease::DISABLE>( - *columns[i]); - column_data[i] = column.get_element(row_num); - } else if (!nullable_column->is_null_at( - row_num)) { // Nullable column && Not null data - const auto& column = - assert_cast<const ColumnVector<Float64>&, TypeCheckOnRelease::DISABLE>( - nullable_column->get_nested_column()); - column_data[i] = column.get_element(row_num); - } else { // Nullable column && null data - if (i == 0) { - return; - } - } - } - - this->data(place).init(column_data[3]); - this->data(place).add_with_weight(column_data[0], column_data[1], column_data[2]); - - } else { - const auto& sources = - assert_cast<const ColumnVector<Float64>&, TypeCheckOnRelease::DISABLE>( - *columns[0]); - const auto& weight = - assert_cast<const ColumnVector<Float64>&, TypeCheckOnRelease::DISABLE>( - *columns[1]); - const auto& quantile = - assert_cast<const ColumnVector<Float64>&, TypeCheckOnRelease::DISABLE>( - *columns[2]); - const auto& compression = - assert_cast<const ColumnVector<Float64>&, TypeCheckOnRelease::DISABLE>( - *columns[3]); - - this->data(place).init(compression.get_element(row_num)); - this->data(place).add_with_weight(sources.get_element(row_num), - weight.get_element(row_num), - quantile.get_element(row_num)); - } - } - - DataTypePtr get_return_type() const override { - return make_nullable(std::make_shared<DataTypeFloat64>()); - } - - void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { - auto& nullable_column = assert_cast<ColumnNullable&>(to); - double result = AggregateFunctionPercentileApprox::data(place).get(); - - if (std::isnan(result)) { - nullable_column.insert_default(); - } else { - auto& col = assert_cast<ColumnFloat64&>(nullable_column.get_nested_column()); - col.get_data().push_back(result); - nullable_column.get_null_map_data().push_back(0); - } - } -}; - class AggregateFunctionPercentileApproxWeightedFourParams : public AggregateFunctionPercentileApprox { public: diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp deleted file mode 100644 index 5ad1ea8f2d3..00000000000 --- a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp +++ /dev/null @@ -1,39 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/aggregate_functions/aggregate_function_percentile_approx.h" - -#include "vec/aggregate_functions/aggregate_function_simple_factory.h" -#include "vec/aggregate_functions/helpers.h" - -namespace doris::vectorized { - -void register_aggregate_function_percentile_old(AggregateFunctionSimpleFactory& factory) { - factory.register_alternative_function( - "percentile", creator_without_type::creator<AggregateFunctionPercentileOld>, false, - AGG_FUNCTION_NULLABLE); - factory.register_alternative_function( - "percentile", creator_without_type::creator<AggregateFunctionPercentileOld>, true, - AGG_FUNCTION_NULLABLE); - factory.register_alternative_function( - "percentile_array", creator_without_type::creator<AggregateFunctionPercentileArrayOld>, - false, AGG_FUNCTION_NULLABLE); - factory.register_alternative_function( - "percentile_array", creator_without_type::creator<AggregateFunctionPercentileArrayOld>, - true, AGG_FUNCTION_NULLABLE); -} -} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h deleted file mode 100644 index 8698355897d..00000000000 --- a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h +++ /dev/null @@ -1,258 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <glog/logging.h> -#include <stddef.h> -#include <stdint.h> - -#include <algorithm> -#include <boost/iterator/iterator_facade.hpp> -#include <cmath> -#include <memory> -#include <ostream> -#include <string> -#include <vector> - -#include "util/counts.h" -#include "util/tdigest.h" -#include "vec/aggregate_functions/aggregate_function.h" -#include "vec/columns/column.h" -#include "vec/columns/column_array.h" -#include "vec/columns/column_nullable.h" -#include "vec/columns/column_vector.h" -#include "vec/common/assert_cast.h" -#include "vec/common/pod_array_fwd.h" -#include "vec/common/string_ref.h" -#include "vec/core/types.h" -#include "vec/data_types/data_type_array.h" -#include "vec/data_types/data_type_nullable.h" -#include "vec/data_types/data_type_number.h" -#include "vec/io/io_helper.h" - -namespace doris { -namespace vectorized { -class Arena; -class BufferReadable; -class BufferWritable; -} // namespace vectorized -} // namespace doris - -namespace doris::vectorized { - -struct OldPercentileState { - std::vector<OldCounts> vec_counts; - std::vector<double> vec_quantile {-1}; - bool inited_flag = false; - - void write(BufferWritable& buf) const { - write_binary(inited_flag, buf); - int size_num = vec_quantile.size(); - write_binary(size_num, buf); - for (const auto& quantile : vec_quantile) { - write_binary(quantile, buf); - } - std::string serialize_str; - for (const auto& counts : vec_counts) { - serialize_str.resize(counts.serialized_size(), '0'); - counts.serialize((uint8_t*)serialize_str.c_str()); - write_binary(serialize_str, buf); - } - } - - void read(BufferReadable& buf) { - read_binary(inited_flag, buf); - int size_num = 0; - read_binary(size_num, buf); - double data = 0.0; - vec_quantile.clear(); - for (int i = 0; i < size_num; ++i) { - read_binary(data, buf); - vec_quantile.emplace_back(data); - } - StringRef ref; - vec_counts.clear(); - vec_counts.resize(size_num); - for (int i = 0; i < size_num; ++i) { - read_binary(ref, buf); - vec_counts[i].unserialize((uint8_t*)ref.data); - } - } - - void add(int64_t source, const PaddedPODArray<Float64>& quantiles, int arg_size) { - if (!inited_flag) { - vec_counts.resize(arg_size); - vec_quantile.resize(arg_size, -1); - inited_flag = true; - for (int i = 0; i < arg_size; ++i) { - vec_quantile[i] = quantiles[i]; - } - } - for (int i = 0; i < arg_size; ++i) { - vec_counts[i].increment(source, 1); - } - } - - void merge(const OldPercentileState& rhs) { - if (!rhs.inited_flag) { - return; - } - int size_num = rhs.vec_quantile.size(); - if (!inited_flag) { - vec_counts.resize(size_num); - vec_quantile.resize(size_num, -1); - inited_flag = true; - } - - for (int i = 0; i < size_num; ++i) { - if (vec_quantile[i] == -1.0) { - vec_quantile[i] = rhs.vec_quantile[i]; - } - vec_counts[i].merge(&(rhs.vec_counts[i])); - } - } - - void reset() { - vec_counts.clear(); - vec_quantile.clear(); - inited_flag = false; - } - - double get() const { return vec_counts.empty() ? 0 : vec_counts[0].terminate(vec_quantile[0]); } - - void insert_result_into(IColumn& to) const { - auto& column_data = assert_cast<ColumnFloat64&>(to).get_data(); - for (int i = 0; i < vec_counts.size(); ++i) { - column_data.push_back(vec_counts[i].terminate(vec_quantile[i])); - } - } -}; - -class AggregateFunctionPercentileOld final - : public IAggregateFunctionDataHelper<OldPercentileState, AggregateFunctionPercentileOld> { -public: - AggregateFunctionPercentileOld(const DataTypes& argument_types_) - : IAggregateFunctionDataHelper<OldPercentileState, AggregateFunctionPercentileOld>( - argument_types_) {} - - String get_name() const override { return "percentile"; } - - DataTypePtr get_return_type() const override { return std::make_shared<DataTypeFloat64>(); } - - void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, - Arena*) const override { - const auto& sources = - assert_cast<const ColumnVector<Int64>&, TypeCheckOnRelease::DISABLE>(*columns[0]); - const auto& quantile = - assert_cast<const ColumnFloat64&, TypeCheckOnRelease::DISABLE>(*columns[1]); - AggregateFunctionPercentileOld::data(place).add(sources.get_int(row_num), - quantile.get_data(), 1); - } - - void reset(AggregateDataPtr __restrict place) const override { - AggregateFunctionPercentileOld::data(place).reset(); - } - - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, - Arena*) const override { - AggregateFunctionPercentileOld::data(place).merge( - AggregateFunctionPercentileOld::data(rhs)); - } - - void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - AggregateFunctionPercentileOld::data(place).write(buf); - } - - void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena*) const override { - AggregateFunctionPercentileOld::data(place).read(buf); - } - - void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { - auto& col = assert_cast<ColumnFloat64&>(to); - col.insert_value(AggregateFunctionPercentileOld::data(place).get()); - } -}; - -class AggregateFunctionPercentileArrayOld final - : public IAggregateFunctionDataHelper<OldPercentileState, - AggregateFunctionPercentileArrayOld> { -public: - AggregateFunctionPercentileArrayOld(const DataTypes& argument_types_) - : IAggregateFunctionDataHelper<OldPercentileState, AggregateFunctionPercentileArrayOld>( - argument_types_) {} - - String get_name() const override { return "percentile_array"; } - - DataTypePtr get_return_type() const override { - return std::make_shared<DataTypeArray>(make_nullable(std::make_shared<DataTypeFloat64>())); - } - - void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, - Arena*) const override { - const auto& sources = - assert_cast<const ColumnVector<Int64>&, TypeCheckOnRelease::DISABLE>(*columns[0]); - const auto& quantile_array = - assert_cast<const ColumnArray&, TypeCheckOnRelease::DISABLE>(*columns[1]); - const auto& offset_column_data = quantile_array.get_offsets(); - const auto& nested_column = assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>( - quantile_array.get_data()) - .get_nested_column(); - const auto& nested_column_data = - assert_cast<const ColumnFloat64&, TypeCheckOnRelease::DISABLE>(nested_column); - - AggregateFunctionPercentileArrayOld::data(place).add( - sources.get_int(row_num), nested_column_data.get_data(), - offset_column_data.data()[row_num] - offset_column_data[(ssize_t)row_num - 1]); - } - - void reset(AggregateDataPtr __restrict place) const override { - AggregateFunctionPercentileArrayOld::data(place).reset(); - } - - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, - Arena*) const override { - AggregateFunctionPercentileArrayOld::data(place).merge( - AggregateFunctionPercentileArrayOld::data(rhs)); - } - - void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - AggregateFunctionPercentileArrayOld::data(place).write(buf); - } - - void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena*) const override { - AggregateFunctionPercentileArrayOld::data(place).read(buf); - } - - void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { - auto& to_arr = assert_cast<ColumnArray&>(to); - auto& to_nested_col = to_arr.get_data(); - if (to_nested_col.is_nullable()) { - auto col_null = reinterpret_cast<ColumnNullable*>(&to_nested_col); - AggregateFunctionPercentileArrayOld::data(place).insert_result_into( - col_null->get_nested_column()); - col_null->get_null_map_data().resize_fill(col_null->get_nested_column().size(), 0); - } else { - AggregateFunctionPercentileArrayOld::data(place).insert_result_into(to_nested_col); - } - to_arr.get_offsets().push_back(to_nested_col.size()); - } -}; - -} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp b/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp index b9e39552395..72448a419e9 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp @@ -51,15 +51,6 @@ AggregateFunctionPtr create_function_single_value(const String& name, return nullptr; } -template <bool is_stddev, bool is_nullable> -AggregateFunctionPtr create_aggregate_function_variance_samp_older(const std::string& name, - const DataTypes& argument_types, - const bool result_is_nullable) { - return create_function_single_value<AggregateFunctionSamp_OLDER, VarianceSampName, - SampData_OLDER, is_stddev, is_nullable>( - name, argument_types, result_is_nullable, true); -} - AggregateFunctionPtr create_aggregate_function_variance_samp(const std::string& name, const DataTypes& argument_types, const bool result_is_nullable) { @@ -67,15 +58,6 @@ AggregateFunctionPtr create_aggregate_function_variance_samp(const std::string& name, argument_types, result_is_nullable, false); } -template <bool is_stddev, bool is_nullable> -AggregateFunctionPtr create_aggregate_function_stddev_samp_older(const std::string& name, - const DataTypes& argument_types, - const bool result_is_nullable) { - return create_function_single_value<AggregateFunctionSamp_OLDER, StddevSampName, SampData_OLDER, - is_stddev, is_nullable>(name, argument_types, - result_is_nullable, true); -} - template <bool is_stddev> AggregateFunctionPtr create_aggregate_function_variance_pop(const std::string& name, const DataTypes& argument_types, @@ -108,18 +90,8 @@ void register_aggregate_function_stddev_variance_pop(AggregateFunctionSimpleFact } void register_aggregate_function_stddev_variance_samp_old(AggregateFunctionSimpleFactory& factory) { - factory.register_alternative_function( - "variance_samp", create_aggregate_function_variance_samp_older<false, false>, false, - AGG_FUNCTION_NULLABLE); - factory.register_alternative_function( - "variance_samp", create_aggregate_function_variance_samp_older<false, true>, true, - AGG_FUNCTION_NULLABLE); - factory.register_alternative_function("stddev_samp", - create_aggregate_function_stddev_samp_older<true, false>, - false, AGG_FUNCTION_NULLABLE); - factory.register_alternative_function("stddev_samp", - create_aggregate_function_stddev_samp_older<true, true>, - true, AGG_FUNCTION_NULLABLE); + BeExecVersionManager::registe_restrict_function_compatibility("variance_samp"); + BeExecVersionManager::registe_restrict_function_compatibility("stddev_samp"); } void register_aggregate_function_stddev_variance_samp(AggregateFunctionSimpleFactory& factory) { diff --git a/be/src/vec/aggregate_functions/aggregate_function_stddev.h b/be/src/vec/aggregate_functions/aggregate_function_stddev.h index 5a8c896e1c3..bcd35b13149 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_stddev.h +++ b/be/src/vec/aggregate_functions/aggregate_function_stddev.h @@ -171,30 +171,6 @@ struct StddevSampName : Data { static const char* name() { return "stddev_samp"; } }; -template <typename T, typename Data> -struct SampData_OLDER : Data { - using ColVecResult = - std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<Decimal128V2>, ColumnFloat64>; - void insert_result_into(IColumn& to) const { - ColumnNullable& nullable_column = assert_cast<ColumnNullable&>(to); - if (this->count == 1 || this->count == 0) { - nullable_column.insert_default(); - } else { - auto& col = assert_cast<ColVecResult&>(nullable_column.get_nested_column()); - if constexpr (IsDecimalNumber<T>) { - col.get_data().push_back(this->get_samp_result().value()); - } else { - col.get_data().push_back(this->get_samp_result()); - } - nullable_column.get_null_map_data().push_back(0); - } - } - - static DataTypePtr get_return_type() { - return make_nullable(std::make_shared<DataTypeNumber<Float64>>()); - } -}; - template <typename T, typename Data> struct SampData : Data { using ColVecResult = @@ -266,16 +242,6 @@ public: } }; -//samp function it's always nullables, it's need to handle nullable column -//so return type and add function should processing null values -template <typename Data, bool is_nullable> -class AggregateFunctionSamp_OLDER final - : public AggregateFunctionSampVariance<false, Data, is_nullable> { -public: - AggregateFunctionSamp_OLDER(const DataTypes& argument_types_) - : AggregateFunctionSampVariance<false, Data, is_nullable>(argument_types_) {} -}; - template <typename Data, bool is_nullable> class AggregateFunctionSamp final : public AggregateFunctionSampVariance<false, Data, is_nullable> { public: diff --git a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp index 598c23eb147..c16121bad73 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp @@ -51,36 +51,10 @@ AggregateFunctionPtr create_aggregate_function_window_funnel(const std::string& } } -AggregateFunctionPtr create_aggregate_function_window_funnel_old(const std::string& name, - const DataTypes& argument_types, - const bool result_is_nullable) { - if (argument_types.size() < 3) { - LOG(WARNING) << "window_funnel's argument less than 3."; - return nullptr; - } - if (WhichDataType(remove_nullable(argument_types[2])).is_date_time_v2()) { - return creator_without_type::create< - AggregateFunctionWindowFunnelOld<DateV2Value<DateTimeV2ValueType>, UInt64>>( - argument_types, result_is_nullable); - } else if (WhichDataType(remove_nullable(argument_types[2])).is_date_time()) { - return creator_without_type::create< - AggregateFunctionWindowFunnelOld<VecDateTimeValue, Int64>>(argument_types, - result_is_nullable); - } else { - LOG(WARNING) << "Only support DateTime type as window argument!"; - return nullptr; - } -} - void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& factory) { factory.register_function_both("window_funnel", create_aggregate_function_window_funnel); } void register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory& factory) { - factory.register_alternative_function("window_funnel", - create_aggregate_function_window_funnel_old, true, - AGG_FUNCTION_NEW_WINDOW_FUNNEL); - factory.register_alternative_function("window_funnel", - create_aggregate_function_window_funnel_old, false, - AGG_FUNCTION_NEW_WINDOW_FUNNEL); + BeExecVersionManager::registe_restrict_function_compatibility("window_funnel"); } } // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h index c0f0a4e7e20..84222f0d01b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h +++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h @@ -423,265 +423,4 @@ protected: using IAggregateFunction::version; }; -template <typename DateValueType, typename NativeType> -struct WindowFunnelStateOld { - std::vector<std::pair<DateValueType, int>> events; - int max_event_level; - bool sorted; - int64_t window; - WindowFunnelMode window_funnel_mode; - bool enable_mode; - - WindowFunnelStateOld() { - sorted = true; - max_event_level = 0; - window = 0; - window_funnel_mode = WindowFunnelMode::INVALID; - } - - void reset() { - sorted = true; - max_event_level = 0; - window = 0; - events.shrink_to_fit(); - } - - void add(const DateValueType& timestamp, int event_idx, int event_num, int64_t win, - WindowFunnelMode mode) { - window = win; - max_event_level = event_num; - window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT; - - if (sorted && events.size() > 0) { - if (events.back().first == timestamp) { - sorted = events.back().second <= event_idx; - } else { - sorted = events.back().first < timestamp; - } - } - events.emplace_back(timestamp, event_idx); - } - - void sort() { - if (sorted) { - return; - } - std::stable_sort(events.begin(), events.end()); - } - - int get() const { - if (max_event_level == 0) { - return 0; - } - std::vector<std::optional<std::pair<DateValueType, DateValueType>>> events_timestamp( - max_event_level); - bool is_first_set = false; - for (int64_t i = 0; i < events.size(); i++) { - const int& event_idx = events[i].second; - const DateValueType& timestamp = events[i].first; - if (event_idx == 0) { - events_timestamp[0] = {timestamp, timestamp}; - is_first_set = true; - continue; - } - if (window_funnel_mode == WindowFunnelMode::DEDUPLICATION && - events_timestamp[event_idx].has_value()) { - break; - } - if (events_timestamp[event_idx - 1].has_value()) { - const DateValueType& first_timestamp = - events_timestamp[event_idx - 1].value().first; - DateValueType last_timestamp = first_timestamp; - TimeInterval interval(SECOND, window, false); - last_timestamp.template date_add_interval<SECOND>(interval); - - if (window_funnel_mode != WindowFunnelMode::INCREASE) { - if (timestamp <= last_timestamp) { - events_timestamp[event_idx] = {first_timestamp, timestamp}; - if (event_idx + 1 == max_event_level) { - // Usually, max event level is small. - return max_event_level; - } - } - } else { - if (timestamp <= last_timestamp && - events_timestamp[event_idx - 1].value().second < timestamp) { - if (!events_timestamp[event_idx].has_value() || - events_timestamp[event_idx].value().second > timestamp) { - events_timestamp[event_idx] = {first_timestamp, timestamp}; - } - if (event_idx + 1 == max_event_level) { - // Usually, max event level is small. - return max_event_level; - } - } - } - } else { - if (is_first_set && window_funnel_mode == WindowFunnelMode::FIXED) { - for (size_t i = 0; i < events_timestamp.size(); i++) { - if (!events_timestamp[i].has_value()) { - return i; - } - } - } - } - } - - for (int64_t i = events_timestamp.size() - 1; i >= 0; i--) { - if (events_timestamp[i].has_value()) { - return i + 1; - } - } - - return 0; - } - - void merge(const WindowFunnelStateOld& other) { - if (other.events.empty()) { - return; - } - - int64_t orig_size = events.size(); - events.insert(std::end(events), std::begin(other.events), std::end(other.events)); - const auto begin = std::begin(events); - const auto middle = std::next(events.begin(), orig_size); - const auto end = std::end(events); - if (!other.sorted) { - std::stable_sort(middle, end); - } - - if (!sorted) { - std::stable_sort(begin, middle); - } - std::inplace_merge(begin, middle, end); - max_event_level = max_event_level > 0 ? max_event_level : other.max_event_level; - window = window > 0 ? window : other.window; - if (enable_mode) { - window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID - ? other.window_funnel_mode - : window_funnel_mode; - } else { - window_funnel_mode = WindowFunnelMode::DEFAULT; - } - sorted = true; - } - - void write(BufferWritable& out) const { - write_var_int(max_event_level, out); - write_var_int(window, out); - if (enable_mode) { - write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode), - out); - } - write_var_int(events.size(), out); - - for (int64_t i = 0; i < events.size(); i++) { - int64_t timestamp = binary_cast<DateValueType, NativeType>(events[i].first); - int event_idx = events[i].second; - write_var_int(timestamp, out); - write_var_int(event_idx, out); - } - } - - void read(BufferReadable& in) { - int64_t event_level; - read_var_int(event_level, in); - max_event_level = (int)event_level; - read_var_int(window, in); - window_funnel_mode = WindowFunnelMode::DEFAULT; - if (enable_mode) { - int64_t mode; - read_var_int(mode, in); - window_funnel_mode = static_cast<WindowFunnelMode>(mode); - } - int64_t size = 0; - read_var_int(size, in); - for (int64_t i = 0; i < size; i++) { - int64_t timestamp; - int64_t event_idx; - - read_var_int(timestamp, in); - read_var_int(event_idx, in); - DateValueType time_value = binary_cast<NativeType, DateValueType>(timestamp); - add(time_value, (int)event_idx, max_event_level, window, window_funnel_mode); - } - } -}; - -template <typename DateValueType, typename NativeType> -class AggregateFunctionWindowFunnelOld - : public IAggregateFunctionDataHelper< - WindowFunnelStateOld<DateValueType, NativeType>, - AggregateFunctionWindowFunnelOld<DateValueType, NativeType>> { -public: - AggregateFunctionWindowFunnelOld(const DataTypes& argument_types_) - : IAggregateFunctionDataHelper< - WindowFunnelStateOld<DateValueType, NativeType>, - AggregateFunctionWindowFunnelOld<DateValueType, NativeType>>( - argument_types_) {} - - void create(AggregateDataPtr __restrict place) const override { - auto data = new (place) WindowFunnelStateOld<DateValueType, NativeType>(); - /// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version` - data->enable_mode = version >= 3; - } - - String get_name() const override { return "window_funnel"; } - - DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt32>(); } - - void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); } - - void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, - Arena*) const override { - const auto& window = - assert_cast<const ColumnVector<Int64>&, TypeCheckOnRelease::DISABLE>(*columns[0]) - .get_data()[row_num]; - StringRef mode = columns[1]->get_data_at(row_num); - const auto& timestamp = - assert_cast<const ColumnVector<NativeType>&, TypeCheckOnRelease::DISABLE>( - *columns[2]) - .get_data()[row_num]; - const int NON_EVENT_NUM = 3; - for (int i = NON_EVENT_NUM; i < IAggregateFunction::get_argument_types().size(); i++) { - const auto& is_set = - assert_cast<const ColumnVector<UInt8>&, TypeCheckOnRelease::DISABLE>( - *columns[i]) - .get_data()[row_num]; - if (is_set) { - this->data(place).add( - binary_cast<NativeType, DateValueType>(timestamp), i - NON_EVENT_NUM, - IAggregateFunction::get_argument_types().size() - NON_EVENT_NUM, window, - string_to_window_funnel_mode(mode.to_string())); - } - } - } - - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, - Arena*) const override { - this->data(place).merge(this->data(rhs)); - } - - void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - this->data(place).write(buf); - } - - void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, - Arena*) const override { - this->data(place).read(buf); - } - - void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { - this->data(const_cast<AggregateDataPtr>(place)).sort(); - assert_cast<ColumnInt32&>(to).get_data().push_back( - IAggregateFunctionDataHelper< - WindowFunnelStateOld<DateValueType, NativeType>, - AggregateFunctionWindowFunnelOld<DateValueType, NativeType>>::data(place) - .get()); - } - -protected: - using IAggregateFunction::version; -}; - } // namespace doris::vectorized diff --git a/be/test/vec/aggregate_functions/vec_window_funnel_test.cpp b/be/test/vec/aggregate_functions/vec_window_funnel_test.cpp index ed019ef699b..f0c93c4beab 100644 --- a/be/test/vec/aggregate_functions/vec_window_funnel_test.cpp +++ b/be/test/vec/aggregate_functions/vec_window_funnel_test.cpp @@ -58,7 +58,8 @@ public: std::make_shared<DataTypeDateTime>(), std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>(), std::make_shared<DataTypeUInt8>()}; - agg_function = factory.get("window_funnel", data_types, false, -1); + agg_function = factory.get("window_funnel", data_types, false, + BeExecVersionManager::get_newest_version()); EXPECT_NE(agg_function, nullptr); } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index dd017b3c023..b28baaecba5 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1877,7 +1877,7 @@ public class Config extends ConfigBase { * Max data version of backends serialize block. */ @ConfField(mutable = false) - public static int max_be_exec_version = 7; + public static int max_be_exec_version = 8; /** * Min data version of backends serialize block. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org