This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d56371da6be [Feature](compatibility) make some agg function restrict 
(#41459)
d56371da6be is described below

commit d56371da6be952e37d8371da8ec683a56b9865fe
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Oct 10 14:52:19 2024 +0800

    [Feature](compatibility) make some agg function restrict (#41459)
    
    ## Proposed changes
    make some agg function restrict
---
 be/src/agent/be_exec_version_manager.cpp           |  26 +-
 be/src/agent/be_exec_version_manager.h             |   9 +-
 .../aggregate_function_covar.cpp                   |  16 +-
 .../aggregate_functions/aggregate_function_covar.h |  34 ---
 .../aggregate_function_percentile.cpp              |  62 +----
 .../aggregate_function_percentile.h                | 270 ---------------------
 .../aggregate_function_percentile_approx.cpp       |  39 ---
 .../aggregate_function_percentile_approx.h         | 258 --------------------
 .../aggregate_function_stddev.cpp                  |  32 +--
 .../aggregate_function_stddev.h                    |  34 ---
 .../aggregate_function_window_funnel.cpp           |  28 +--
 .../aggregate_function_window_funnel.h             | 261 --------------------
 .../aggregate_functions/vec_window_funnel_test.cpp |   3 +-
 .../main/java/org/apache/doris/common/Config.java  |   2 +-
 14 files changed, 40 insertions(+), 1034 deletions(-)

diff --git a/be/src/agent/be_exec_version_manager.cpp 
b/be/src/agent/be_exec_version_manager.cpp
index 0bdb55f7bc3..bfd0745e316 100644
--- a/be/src/agent/be_exec_version_manager.cpp
+++ b/be/src/agent/be_exec_version_manager.cpp
@@ -34,6 +34,13 @@ Status BeExecVersionManager::check_be_exec_version(int 
be_exec_version) {
 
 int BeExecVersionManager::get_function_compatibility(int be_exec_version,
                                                      std::string 
function_name) {
+    if (_function_restrict_map.contains(function_name) && be_exec_version != 
get_newest_version()) {
+        throw Exception(Status::InternalError(
+                "function {} do not support old be exec version, maybe it's 
because doris are "
+                "doing a rolling upgrade. newest_version={}, 
input_be_exec_version={}",
+                function_name, get_newest_version(), be_exec_version));
+    }
+
     auto it = _function_change_map.find(function_name);
     if (it == _function_change_map.end()) {
         // 0 means no compatibility issues need to be dealt with
@@ -82,7 +89,7 @@ void BeExecVersionManager::check_function_compatibility(int 
current_be_exec_vers
  * 3: start from doris 2.0.0 (by some mistakes)
  *    a. aggregation function do not serialize bitmap to string.
  *    b. support window funnel mode.
- * 4/5: start from doris 2.1.0
+ * 4: start from doris 2.1.0
  *    a. ignore this line, window funnel mode should be enabled from 2.0.
  *    b. array contains/position/countequal function return nullable in less 
situations.
  *    c. cleared old version of Version 2.
@@ -92,15 +99,22 @@ void BeExecVersionManager::check_function_compatibility(int 
current_be_exec_vers
  *    g. do local merge of remote runtime filter
  *    h. "now": ALWAYS_NOT_NULLABLE -> DEPEND_ON_ARGUMENTS
  *
- * 7: start from doris 3.0.0
+ * 5: start from doris 3.0.0
+ *    a. change some agg function nullable property: PR #37215
+ *
+ * 6: start from doris 3.0.1 and 2.1.6
  *    a. change the impl of percentile (need fix)
  *    b. clear old version of version 3->4
  *    c. change FunctionIsIPAddressInRange from AlwaysNotNullable to 
DependOnArguments
- *    d. change some agg function nullable property: PR #37215
- *    e. change variant serde to fix PR #38413
- *    f. support const column in serialize/deserialize function: PR #41175
+ *    d. change variant serde to fix PR #38413
+ *
+ * 7: start from doris 3.0.2
+ *    a. window funnel logic change
+*     b. support const column in serialize/deserialize function: PR #41175
  */
-const int BeExecVersionManager::max_be_exec_version = 7;
+
+const int BeExecVersionManager::max_be_exec_version = 8;
 const int BeExecVersionManager::min_be_exec_version = 0;
 std::map<std::string, std::set<int>> 
BeExecVersionManager::_function_change_map {};
+std::set<std::string> BeExecVersionManager::_function_restrict_map;
 } // namespace doris
diff --git a/be/src/agent/be_exec_version_manager.h 
b/be/src/agent/be_exec_version_manager.h
index a51fb8e36b4..f4158a40152 100644
--- a/be/src/agent/be_exec_version_manager.h
+++ b/be/src/agent/be_exec_version_manager.h
@@ -25,7 +25,6 @@
 
 namespace doris {
 
-constexpr static int AGG_FUNCTION_NEW_WINDOW_FUNNEL = 6;
 constexpr inline int BITMAP_SERDE = 3;
 constexpr inline int USE_NEW_SERDE = 4;         // release on DORIS version 2.1
 constexpr inline int OLD_WAL_SERDE = 3;         // use to solve compatibility 
issues, see pr #32299
@@ -34,7 +33,7 @@ constexpr inline int VARIANT_SERDE = 6;         // change 
variant serde to fix P
 constexpr inline int AGGREGATION_2_1_VERSION =
         6; // some aggregation changed the data format after this version
 constexpr inline int USE_CONST_SERDE =
-        7; // support const column in serialize/deserialize function: PR #41175
+        8; // support const column in serialize/deserialize function: PR #41175
 
 class BeExecVersionManager {
 public:
@@ -59,11 +58,17 @@ public:
         _function_change_map[function_name].insert(breaking_old_version);
     }
 
+    static void registe_restrict_function_compatibility(std::string 
function_name) {
+        _function_restrict_map.insert(function_name);
+    }
+
 private:
     static const int max_be_exec_version;
     static const int min_be_exec_version;
     // [function name] -> [breaking change start version]
     static std::map<std::string, std::set<int>> _function_change_map;
+    // those function must has input newest be exec version
+    static std::set<std::string> _function_restrict_map;
 };
 
 } // namespace doris
diff --git a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp
index 76a2881dd78..b02d6ae0e12 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp
@@ -51,15 +51,6 @@ AggregateFunctionPtr create_function_single_value(const 
String& name,
     return nullptr;
 }
 
-template <bool is_nullable>
-AggregateFunctionPtr create_aggregate_function_covariance_samp_old(const 
std::string& name,
-                                                                   const 
DataTypes& argument_types,
-                                                                   const bool 
result_is_nullable) {
-    return create_function_single_value<AggregateFunctionSamp_OLDER, 
CovarSampName, SampData_OLDER,
-                                        is_nullable>(name, argument_types, 
result_is_nullable,
-                                                     NULLABLE);
-}
-
 AggregateFunctionPtr create_aggregate_function_covariance_samp(const 
std::string& name,
                                                                const 
DataTypes& argument_types,
                                                                const bool 
result_is_nullable) {
@@ -80,12 +71,7 @@ void 
register_aggregate_function_covar_pop(AggregateFunctionSimpleFactory& facto
 }
 
 void 
register_aggregate_function_covar_samp_old(AggregateFunctionSimpleFactory& 
factory) {
-    factory.register_alternative_function(
-            "covar_samp", 
create_aggregate_function_covariance_samp_old<NOTNULLABLE>, false,
-            AGG_FUNCTION_NULLABLE);
-    factory.register_alternative_function("covar_samp",
-                                          
create_aggregate_function_covariance_samp_old<NULLABLE>,
-                                          true, AGG_FUNCTION_NULLABLE);
+    
BeExecVersionManager::registe_restrict_function_compatibility("covar_samp");
 }
 
 void register_aggregate_function_covar_samp(AggregateFunctionSimpleFactory& 
factory) {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_covar.h 
b/be/src/vec/aggregate_functions/aggregate_function_covar.h
index 78a3eae5bcb..e6ebec70285 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_covar.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_covar.h
@@ -137,32 +137,6 @@ struct PopData : Data {
     static DataTypePtr get_return_type() { return 
std::make_shared<DataTypeNumber<Float64>>(); }
 };
 
-template <typename T, typename Data>
-struct SampData_OLDER : Data {
-    void insert_result_into(IColumn& to) const {
-        if (to.is_nullable()) {
-            ColumnNullable& nullable_column = assert_cast<ColumnNullable&>(to);
-            if (this->count == 1 || this->count == 0) {
-                nullable_column.insert_default();
-            } else {
-                auto& col = 
assert_cast<ColumnFloat64&>(nullable_column.get_nested_column());
-                col.get_data().push_back(this->get_samp_result());
-                nullable_column.get_null_map_data().push_back(0);
-            }
-        } else {
-            auto& col = assert_cast<ColumnFloat64&>(to);
-            if (this->count == 1 || this->count == 0) {
-                col.insert_default();
-            } else {
-                col.get_data().push_back(this->get_samp_result());
-            }
-        }
-    }
-    static DataTypePtr get_return_type() {
-        return make_nullable(std::make_shared<DataTypeNumber<Float64>>());
-    }
-};
-
 template <typename T, typename Data>
 struct SampData : Data {
     void insert_result_into(IColumn& to) const {
@@ -258,14 +232,6 @@ public:
     }
 };
 
-template <typename Data, bool is_nullable>
-class AggregateFunctionSamp_OLDER final
-        : public AggregateFunctionSampCovariance<NOTPOP, Data, is_nullable> {
-public:
-    AggregateFunctionSamp_OLDER(const DataTypes& argument_types_)
-            : AggregateFunctionSampCovariance<NOTPOP, Data, 
is_nullable>(argument_types_) {}
-};
-
 template <typename Data, bool is_nullable>
 class AggregateFunctionSamp final
         : public AggregateFunctionSampCovariance<NOTPOP, Data, is_nullable> {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp
index ac8e40d0312..248d808f2cc 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp
@@ -23,27 +23,6 @@
 
 namespace doris::vectorized {
 
-template <bool is_nullable>
-AggregateFunctionPtr create_aggregate_function_percentile_approx_older(
-        const std::string& name, const DataTypes& argument_types, const bool 
result_is_nullable) {
-    const DataTypePtr& argument_type = remove_nullable(argument_types[0]);
-    WhichDataType which(argument_type);
-    if (which.idx != TypeIndex::Float64) {
-        return nullptr;
-    }
-    if (argument_types.size() == 2) {
-        return creator_without_type::create<
-                AggregateFunctionPercentileApproxTwoParams_OLDER<is_nullable>>(
-                remove_nullable(argument_types), result_is_nullable);
-    }
-    if (argument_types.size() == 3) {
-        return creator_without_type::create<
-                
AggregateFunctionPercentileApproxThreeParams_OLDER<is_nullable>>(
-                remove_nullable(argument_types), result_is_nullable);
-    }
-    return nullptr;
-}
-
 AggregateFunctionPtr create_aggregate_function_percentile_approx(const 
std::string& name,
                                                                  const 
DataTypes& argument_types,
                                                                  const bool 
result_is_nullable) {
@@ -63,27 +42,6 @@ AggregateFunctionPtr 
create_aggregate_function_percentile_approx(const std::stri
     return nullptr;
 }
 
-template <bool is_nullable>
-AggregateFunctionPtr 
create_aggregate_function_percentile_approx_weighted_older(
-        const std::string& name, const DataTypes& argument_types, const bool 
result_is_nullable) {
-    const DataTypePtr& argument_type = remove_nullable(argument_types[0]);
-    WhichDataType which(argument_type);
-    if (which.idx != TypeIndex::Float64) {
-        return nullptr;
-    }
-    if (argument_types.size() == 3) {
-        return creator_without_type::create<
-                
AggregateFunctionPercentileApproxWeightedThreeParams_OLDER<is_nullable>>(
-                remove_nullable(argument_types), result_is_nullable);
-    }
-    if (argument_types.size() == 4) {
-        return creator_without_type::create<
-                
AggregateFunctionPercentileApproxWeightedFourParams_OLDER<is_nullable>>(
-                remove_nullable(argument_types), result_is_nullable);
-    }
-    return nullptr;
-}
-
 AggregateFunctionPtr create_aggregate_function_percentile_approx_weighted(
         const std::string& name, const DataTypes& argument_types, const bool 
result_is_nullable) {
     const DataTypePtr& argument_type = remove_nullable(argument_types[0]);
@@ -111,20 +69,12 @@ void 
register_aggregate_function_percentile(AggregateFunctionSimpleFactory& fact
 }
 
 void register_percentile_approx_old_function(AggregateFunctionSimpleFactory& 
factory) {
-    factory.register_alternative_function("percentile_approx",
-                                          
create_aggregate_function_percentile_approx_older<false>,
-                                          false, AGG_FUNCTION_NULLABLE);
-    factory.register_alternative_function("percentile_approx",
-                                          
create_aggregate_function_percentile_approx_older<true>,
-                                          true, AGG_FUNCTION_NULLABLE);
-    factory.register_alternative_function(
-            "percentile_approx_weighted",
-            create_aggregate_function_percentile_approx_weighted_older<false>, 
false,
-            AGG_FUNCTION_NULLABLE);
-    factory.register_alternative_function(
-            "percentile_approx_weighted",
-            create_aggregate_function_percentile_approx_weighted_older<true>, 
true,
-            AGG_FUNCTION_NULLABLE);
+    
BeExecVersionManager::registe_restrict_function_compatibility("percentile_approx");
+    
BeExecVersionManager::registe_restrict_function_compatibility("percentile_approx_weighted");
+}
+
+void 
register_aggregate_function_percentile_old(AggregateFunctionSimpleFactory& 
factory) {
+    
BeExecVersionManager::registe_restrict_function_compatibility("percentile");
 }
 
 void 
register_aggregate_function_percentile_approx(AggregateFunctionSimpleFactory& 
factory) {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile.h 
b/be/src/vec/aggregate_functions/aggregate_function_percentile.h
index 0cec238846e..a1e739d8758 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_percentile.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_percentile.h
@@ -182,68 +182,6 @@ public:
     }
 };
 
-template <bool is_nullable>
-class AggregateFunctionPercentileApproxTwoParams_OLDER : public 
AggregateFunctionPercentileApprox {
-public:
-    AggregateFunctionPercentileApproxTwoParams_OLDER(const DataTypes& 
argument_types_)
-            : AggregateFunctionPercentileApprox(argument_types_) {}
-    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
-             Arena*) const override {
-        if constexpr (is_nullable) {
-            double column_data[2] = {0, 0};
-
-            for (int i = 0; i < 2; ++i) {
-                const auto* nullable_column = 
check_and_get_column<ColumnNullable>(columns[i]);
-                if (nullable_column == nullptr) { //Not Nullable column
-                    const auto& column =
-                            assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(
-                                    *columns[i]);
-                    column_data[i] = column.get_element(row_num);
-                } else if (!nullable_column->is_null_at(
-                                   row_num)) { // Nullable column && Not null 
data
-                    const auto& column =
-                            assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(
-                                    nullable_column->get_nested_column());
-                    column_data[i] = column.get_element(row_num);
-                } else { // Nullable column && null data
-                    if (i == 0) {
-                        return;
-                    }
-                }
-            }
-
-            this->data(place).init();
-            this->data(place).add(column_data[0], column_data[1]);
-
-        } else {
-            const auto& sources =
-                    assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[0]);
-            const auto& quantile =
-                    assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[1]);
-
-            this->data(place).init();
-            this->data(place).add(sources.get_element(row_num), 
quantile.get_element(row_num));
-        }
-    }
-
-    DataTypePtr get_return_type() const override {
-        return make_nullable(std::make_shared<DataTypeFloat64>());
-    }
-
-    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
-        auto& nullable_column = assert_cast<ColumnNullable&>(to);
-        double result = AggregateFunctionPercentileApprox::data(place).get();
-
-        if (std::isnan(result)) {
-            nullable_column.insert_default();
-        } else {
-            auto& col = 
assert_cast<ColumnFloat64&>(nullable_column.get_nested_column());
-            col.get_data().push_back(result);
-            nullable_column.get_null_map_data().push_back(0);
-        }
-    }
-};
-
 class AggregateFunctionPercentileApproxTwoParams : public 
AggregateFunctionPercentileApprox {
 public:
     AggregateFunctionPercentileApproxTwoParams(const DataTypes& 
argument_types_)
@@ -272,71 +210,6 @@ public:
     }
 };
 
-template <bool is_nullable>
-class AggregateFunctionPercentileApproxThreeParams_OLDER
-        : public AggregateFunctionPercentileApprox {
-public:
-    AggregateFunctionPercentileApproxThreeParams_OLDER(const DataTypes& 
argument_types_)
-            : AggregateFunctionPercentileApprox(argument_types_) {}
-    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
-             Arena*) const override {
-        if constexpr (is_nullable) {
-            double column_data[3] = {0, 0, 0};
-
-            for (int i = 0; i < 3; ++i) {
-                const auto* nullable_column = 
check_and_get_column<ColumnNullable>(columns[i]);
-                if (nullable_column == nullptr) { //Not Nullable column
-                    const auto& column =
-                            assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(
-                                    *columns[i]);
-                    column_data[i] = column.get_element(row_num);
-                } else if (!nullable_column->is_null_at(
-                                   row_num)) { // Nullable column && Not null 
data
-                    const auto& column =
-                            assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(
-                                    nullable_column->get_nested_column());
-                    column_data[i] = column.get_element(row_num);
-                } else { // Nullable column && null data
-                    if (i == 0) {
-                        return;
-                    }
-                }
-            }
-
-            this->data(place).init(column_data[2]);
-            this->data(place).add(column_data[0], column_data[1]);
-
-        } else {
-            const auto& sources =
-                    assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[0]);
-            const auto& quantile =
-                    assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[1]);
-            const auto& compression =
-                    assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[2]);
-
-            this->data(place).init(compression.get_element(row_num));
-            this->data(place).add(sources.get_element(row_num), 
quantile.get_element(row_num));
-        }
-    }
-
-    DataTypePtr get_return_type() const override {
-        return make_nullable(std::make_shared<DataTypeFloat64>());
-    }
-
-    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
-        auto& nullable_column = assert_cast<ColumnNullable&>(to);
-        double result = AggregateFunctionPercentileApprox::data(place).get();
-
-        if (std::isnan(result)) {
-            nullable_column.insert_default();
-        } else {
-            auto& col = 
assert_cast<ColumnFloat64&>(nullable_column.get_nested_column());
-            col.get_data().push_back(result);
-            nullable_column.get_null_map_data().push_back(0);
-        }
-    }
-};
-
 class AggregateFunctionPercentileApproxThreeParams : public 
AggregateFunctionPercentileApprox {
 public:
     AggregateFunctionPercentileApproxThreeParams(const DataTypes& 
argument_types_)
@@ -368,76 +241,6 @@ public:
     }
 };
 
-template <bool is_nullable>
-class AggregateFunctionPercentileApproxWeightedThreeParams_OLDER
-        : public AggregateFunctionPercentileApprox {
-public:
-    AggregateFunctionPercentileApproxWeightedThreeParams_OLDER(const 
DataTypes& argument_types_)
-            : AggregateFunctionPercentileApprox(argument_types_) {}
-
-    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
-             Arena*) const override {
-        if constexpr (is_nullable) {
-            // sources quantile weight
-            double column_data[3] = {0, 0, 0};
-            for (int i = 0; i < 3; ++i) {
-                const auto* nullable_column = 
check_and_get_column<ColumnNullable>(columns[i]);
-                if (nullable_column == nullptr) { //Not Nullable column
-                    const auto& column =
-                            assert_cast<const ColumnVector<Float64>&, 
TypeCheckOnRelease::DISABLE>(
-                                    *columns[i]);
-                    column_data[i] = column.get_element(row_num);
-                } else if (!nullable_column->is_null_at(
-                                   row_num)) { // Nullable column && Not null 
data
-                    const auto& column =
-                            assert_cast<const ColumnVector<Float64>&, 
TypeCheckOnRelease::DISABLE>(
-                                    nullable_column->get_nested_column());
-                    column_data[i] = column.get_element(row_num);
-                } else { // Nullable column && null data
-                    if (i == 0) {
-                        return;
-                    }
-                }
-            }
-            this->data(place).init();
-            this->data(place).add_with_weight(column_data[0], column_data[1], 
column_data[2]);
-
-        } else {
-            const auto& sources =
-                    assert_cast<const ColumnVector<Float64>&, 
TypeCheckOnRelease::DISABLE>(
-                            *columns[0]);
-            const auto& weight =
-                    assert_cast<const ColumnVector<Float64>&, 
TypeCheckOnRelease::DISABLE>(
-                            *columns[1]);
-            const auto& quantile =
-                    assert_cast<const ColumnVector<Float64>&, 
TypeCheckOnRelease::DISABLE>(
-                            *columns[2]);
-
-            this->data(place).init();
-            this->data(place).add_with_weight(sources.get_element(row_num),
-                                              weight.get_element(row_num),
-                                              quantile.get_element(row_num));
-        }
-    }
-
-    DataTypePtr get_return_type() const override {
-        return make_nullable(std::make_shared<DataTypeFloat64>());
-    }
-
-    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
-        auto& nullable_column = assert_cast<ColumnNullable&>(to);
-        double result = AggregateFunctionPercentileApprox::data(place).get();
-
-        if (std::isnan(result)) {
-            nullable_column.insert_default();
-        } else {
-            auto& col = 
assert_cast<ColumnFloat64&>(nullable_column.get_nested_column());
-            col.get_data().push_back(result);
-            nullable_column.get_null_map_data().push_back(0);
-        }
-    }
-};
-
 class AggregateFunctionPercentileApproxWeightedThreeParams
         : public AggregateFunctionPercentileApprox {
 public:
@@ -472,79 +275,6 @@ public:
     }
 };
 
-template <bool is_nullable>
-class AggregateFunctionPercentileApproxWeightedFourParams_OLDER
-        : public AggregateFunctionPercentileApprox {
-public:
-    AggregateFunctionPercentileApproxWeightedFourParams_OLDER(const DataTypes& 
argument_types_)
-            : AggregateFunctionPercentileApprox(argument_types_) {}
-    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
-             Arena*) const override {
-        if constexpr (is_nullable) {
-            double column_data[4] = {0, 0, 0, 0};
-
-            for (int i = 0; i < 4; ++i) {
-                const auto* nullable_column = 
check_and_get_column<ColumnNullable>(columns[i]);
-                if (nullable_column == nullptr) { //Not Nullable column
-                    const auto& column =
-                            assert_cast<const ColumnVector<Float64>&, 
TypeCheckOnRelease::DISABLE>(
-                                    *columns[i]);
-                    column_data[i] = column.get_element(row_num);
-                } else if (!nullable_column->is_null_at(
-                                   row_num)) { // Nullable column && Not null 
data
-                    const auto& column =
-                            assert_cast<const ColumnVector<Float64>&, 
TypeCheckOnRelease::DISABLE>(
-                                    nullable_column->get_nested_column());
-                    column_data[i] = column.get_element(row_num);
-                } else { // Nullable column && null data
-                    if (i == 0) {
-                        return;
-                    }
-                }
-            }
-
-            this->data(place).init(column_data[3]);
-            this->data(place).add_with_weight(column_data[0], column_data[1], 
column_data[2]);
-
-        } else {
-            const auto& sources =
-                    assert_cast<const ColumnVector<Float64>&, 
TypeCheckOnRelease::DISABLE>(
-                            *columns[0]);
-            const auto& weight =
-                    assert_cast<const ColumnVector<Float64>&, 
TypeCheckOnRelease::DISABLE>(
-                            *columns[1]);
-            const auto& quantile =
-                    assert_cast<const ColumnVector<Float64>&, 
TypeCheckOnRelease::DISABLE>(
-                            *columns[2]);
-            const auto& compression =
-                    assert_cast<const ColumnVector<Float64>&, 
TypeCheckOnRelease::DISABLE>(
-                            *columns[3]);
-
-            this->data(place).init(compression.get_element(row_num));
-            this->data(place).add_with_weight(sources.get_element(row_num),
-                                              weight.get_element(row_num),
-                                              quantile.get_element(row_num));
-        }
-    }
-
-    DataTypePtr get_return_type() const override {
-        return make_nullable(std::make_shared<DataTypeFloat64>());
-    }
-
-    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
-        auto& nullable_column = assert_cast<ColumnNullable&>(to);
-        double result = AggregateFunctionPercentileApprox::data(place).get();
-
-        if (std::isnan(result)) {
-            nullable_column.insert_default();
-        } else {
-            auto& col = 
assert_cast<ColumnFloat64&>(nullable_column.get_nested_column());
-            col.get_data().push_back(result);
-            nullable_column.get_null_map_data().push_back(0);
-        }
-    }
-};
-
 class AggregateFunctionPercentileApproxWeightedFourParams
         : public AggregateFunctionPercentileApprox {
 public:
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp
deleted file mode 100644
index 5ad1ea8f2d3..00000000000
--- a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp
+++ /dev/null
@@ -1,39 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/aggregate_functions/aggregate_function_percentile_approx.h"
-
-#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
-#include "vec/aggregate_functions/helpers.h"
-
-namespace doris::vectorized {
-
-void 
register_aggregate_function_percentile_old(AggregateFunctionSimpleFactory& 
factory) {
-    factory.register_alternative_function(
-            "percentile", 
creator_without_type::creator<AggregateFunctionPercentileOld>, false,
-            AGG_FUNCTION_NULLABLE);
-    factory.register_alternative_function(
-            "percentile", 
creator_without_type::creator<AggregateFunctionPercentileOld>, true,
-            AGG_FUNCTION_NULLABLE);
-    factory.register_alternative_function(
-            "percentile_array", 
creator_without_type::creator<AggregateFunctionPercentileArrayOld>,
-            false, AGG_FUNCTION_NULLABLE);
-    factory.register_alternative_function(
-            "percentile_array", 
creator_without_type::creator<AggregateFunctionPercentileArrayOld>,
-            true, AGG_FUNCTION_NULLABLE);
-}
-} // namespace doris::vectorized
\ No newline at end of file
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h 
b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h
deleted file mode 100644
index 8698355897d..00000000000
--- a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h
+++ /dev/null
@@ -1,258 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <glog/logging.h>
-#include <stddef.h>
-#include <stdint.h>
-
-#include <algorithm>
-#include <boost/iterator/iterator_facade.hpp>
-#include <cmath>
-#include <memory>
-#include <ostream>
-#include <string>
-#include <vector>
-
-#include "util/counts.h"
-#include "util/tdigest.h"
-#include "vec/aggregate_functions/aggregate_function.h"
-#include "vec/columns/column.h"
-#include "vec/columns/column_array.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_vector.h"
-#include "vec/common/assert_cast.h"
-#include "vec/common/pod_array_fwd.h"
-#include "vec/common/string_ref.h"
-#include "vec/core/types.h"
-#include "vec/data_types/data_type_array.h"
-#include "vec/data_types/data_type_nullable.h"
-#include "vec/data_types/data_type_number.h"
-#include "vec/io/io_helper.h"
-
-namespace doris {
-namespace vectorized {
-class Arena;
-class BufferReadable;
-class BufferWritable;
-} // namespace vectorized
-} // namespace doris
-
-namespace doris::vectorized {
-
-struct OldPercentileState {
-    std::vector<OldCounts> vec_counts;
-    std::vector<double> vec_quantile {-1};
-    bool inited_flag = false;
-
-    void write(BufferWritable& buf) const {
-        write_binary(inited_flag, buf);
-        int size_num = vec_quantile.size();
-        write_binary(size_num, buf);
-        for (const auto& quantile : vec_quantile) {
-            write_binary(quantile, buf);
-        }
-        std::string serialize_str;
-        for (const auto& counts : vec_counts) {
-            serialize_str.resize(counts.serialized_size(), '0');
-            counts.serialize((uint8_t*)serialize_str.c_str());
-            write_binary(serialize_str, buf);
-        }
-    }
-
-    void read(BufferReadable& buf) {
-        read_binary(inited_flag, buf);
-        int size_num = 0;
-        read_binary(size_num, buf);
-        double data = 0.0;
-        vec_quantile.clear();
-        for (int i = 0; i < size_num; ++i) {
-            read_binary(data, buf);
-            vec_quantile.emplace_back(data);
-        }
-        StringRef ref;
-        vec_counts.clear();
-        vec_counts.resize(size_num);
-        for (int i = 0; i < size_num; ++i) {
-            read_binary(ref, buf);
-            vec_counts[i].unserialize((uint8_t*)ref.data);
-        }
-    }
-
-    void add(int64_t source, const PaddedPODArray<Float64>& quantiles, int 
arg_size) {
-        if (!inited_flag) {
-            vec_counts.resize(arg_size);
-            vec_quantile.resize(arg_size, -1);
-            inited_flag = true;
-            for (int i = 0; i < arg_size; ++i) {
-                vec_quantile[i] = quantiles[i];
-            }
-        }
-        for (int i = 0; i < arg_size; ++i) {
-            vec_counts[i].increment(source, 1);
-        }
-    }
-
-    void merge(const OldPercentileState& rhs) {
-        if (!rhs.inited_flag) {
-            return;
-        }
-        int size_num = rhs.vec_quantile.size();
-        if (!inited_flag) {
-            vec_counts.resize(size_num);
-            vec_quantile.resize(size_num, -1);
-            inited_flag = true;
-        }
-
-        for (int i = 0; i < size_num; ++i) {
-            if (vec_quantile[i] == -1.0) {
-                vec_quantile[i] = rhs.vec_quantile[i];
-            }
-            vec_counts[i].merge(&(rhs.vec_counts[i]));
-        }
-    }
-
-    void reset() {
-        vec_counts.clear();
-        vec_quantile.clear();
-        inited_flag = false;
-    }
-
-    double get() const { return vec_counts.empty() ? 0 : 
vec_counts[0].terminate(vec_quantile[0]); }
-
-    void insert_result_into(IColumn& to) const {
-        auto& column_data = assert_cast<ColumnFloat64&>(to).get_data();
-        for (int i = 0; i < vec_counts.size(); ++i) {
-            column_data.push_back(vec_counts[i].terminate(vec_quantile[i]));
-        }
-    }
-};
-
-class AggregateFunctionPercentileOld final
-        : public IAggregateFunctionDataHelper<OldPercentileState, 
AggregateFunctionPercentileOld> {
-public:
-    AggregateFunctionPercentileOld(const DataTypes& argument_types_)
-            : IAggregateFunctionDataHelper<OldPercentileState, 
AggregateFunctionPercentileOld>(
-                      argument_types_) {}
-
-    String get_name() const override { return "percentile"; }
-
-    DataTypePtr get_return_type() const override { return 
std::make_shared<DataTypeFloat64>(); }
-
-    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
-             Arena*) const override {
-        const auto& sources =
-                assert_cast<const ColumnVector<Int64>&, 
TypeCheckOnRelease::DISABLE>(*columns[0]);
-        const auto& quantile =
-                assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(*columns[1]);
-        
AggregateFunctionPercentileOld::data(place).add(sources.get_int(row_num),
-                                                        quantile.get_data(), 
1);
-    }
-
-    void reset(AggregateDataPtr __restrict place) const override {
-        AggregateFunctionPercentileOld::data(place).reset();
-    }
-
-    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
-               Arena*) const override {
-        AggregateFunctionPercentileOld::data(place).merge(
-                AggregateFunctionPercentileOld::data(rhs));
-    }
-
-    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
-        AggregateFunctionPercentileOld::data(place).write(buf);
-    }
-
-    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
-                     Arena*) const override {
-        AggregateFunctionPercentileOld::data(place).read(buf);
-    }
-
-    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
-        auto& col = assert_cast<ColumnFloat64&>(to);
-        col.insert_value(AggregateFunctionPercentileOld::data(place).get());
-    }
-};
-
-class AggregateFunctionPercentileArrayOld final
-        : public IAggregateFunctionDataHelper<OldPercentileState,
-                                              
AggregateFunctionPercentileArrayOld> {
-public:
-    AggregateFunctionPercentileArrayOld(const DataTypes& argument_types_)
-            : IAggregateFunctionDataHelper<OldPercentileState, 
AggregateFunctionPercentileArrayOld>(
-                      argument_types_) {}
-
-    String get_name() const override { return "percentile_array"; }
-
-    DataTypePtr get_return_type() const override {
-        return 
std::make_shared<DataTypeArray>(make_nullable(std::make_shared<DataTypeFloat64>()));
-    }
-
-    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
-             Arena*) const override {
-        const auto& sources =
-                assert_cast<const ColumnVector<Int64>&, 
TypeCheckOnRelease::DISABLE>(*columns[0]);
-        const auto& quantile_array =
-                assert_cast<const ColumnArray&, 
TypeCheckOnRelease::DISABLE>(*columns[1]);
-        const auto& offset_column_data = quantile_array.get_offsets();
-        const auto& nested_column = assert_cast<const ColumnNullable&, 
TypeCheckOnRelease::DISABLE>(
-                                            quantile_array.get_data())
-                                            .get_nested_column();
-        const auto& nested_column_data =
-                assert_cast<const ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(nested_column);
-
-        AggregateFunctionPercentileArrayOld::data(place).add(
-                sources.get_int(row_num), nested_column_data.get_data(),
-                offset_column_data.data()[row_num] - 
offset_column_data[(ssize_t)row_num - 1]);
-    }
-
-    void reset(AggregateDataPtr __restrict place) const override {
-        AggregateFunctionPercentileArrayOld::data(place).reset();
-    }
-
-    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
-               Arena*) const override {
-        AggregateFunctionPercentileArrayOld::data(place).merge(
-                AggregateFunctionPercentileArrayOld::data(rhs));
-    }
-
-    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
-        AggregateFunctionPercentileArrayOld::data(place).write(buf);
-    }
-
-    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
-                     Arena*) const override {
-        AggregateFunctionPercentileArrayOld::data(place).read(buf);
-    }
-
-    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
-        auto& to_arr = assert_cast<ColumnArray&>(to);
-        auto& to_nested_col = to_arr.get_data();
-        if (to_nested_col.is_nullable()) {
-            auto col_null = reinterpret_cast<ColumnNullable*>(&to_nested_col);
-            
AggregateFunctionPercentileArrayOld::data(place).insert_result_into(
-                    col_null->get_nested_column());
-            
col_null->get_null_map_data().resize_fill(col_null->get_nested_column().size(), 
0);
-        } else {
-            
AggregateFunctionPercentileArrayOld::data(place).insert_result_into(to_nested_col);
-        }
-        to_arr.get_offsets().push_back(to_nested_col.size());
-    }
-};
-
-} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp
index b9e39552395..72448a419e9 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp
@@ -51,15 +51,6 @@ AggregateFunctionPtr create_function_single_value(const 
String& name,
     return nullptr;
 }
 
-template <bool is_stddev, bool is_nullable>
-AggregateFunctionPtr create_aggregate_function_variance_samp_older(const 
std::string& name,
-                                                                   const 
DataTypes& argument_types,
-                                                                   const bool 
result_is_nullable) {
-    return create_function_single_value<AggregateFunctionSamp_OLDER, 
VarianceSampName,
-                                        SampData_OLDER, is_stddev, 
is_nullable>(
-            name, argument_types, result_is_nullable, true);
-}
-
 AggregateFunctionPtr create_aggregate_function_variance_samp(const 
std::string& name,
                                                              const DataTypes& 
argument_types,
                                                              const bool 
result_is_nullable) {
@@ -67,15 +58,6 @@ AggregateFunctionPtr 
create_aggregate_function_variance_samp(const std::string&
             name, argument_types, result_is_nullable, false);
 }
 
-template <bool is_stddev, bool is_nullable>
-AggregateFunctionPtr create_aggregate_function_stddev_samp_older(const 
std::string& name,
-                                                                 const 
DataTypes& argument_types,
-                                                                 const bool 
result_is_nullable) {
-    return create_function_single_value<AggregateFunctionSamp_OLDER, 
StddevSampName, SampData_OLDER,
-                                        is_stddev, is_nullable>(name, 
argument_types,
-                                                                
result_is_nullable, true);
-}
-
 template <bool is_stddev>
 AggregateFunctionPtr create_aggregate_function_variance_pop(const std::string& 
name,
                                                             const DataTypes& 
argument_types,
@@ -108,18 +90,8 @@ void 
register_aggregate_function_stddev_variance_pop(AggregateFunctionSimpleFact
 }
 
 void 
register_aggregate_function_stddev_variance_samp_old(AggregateFunctionSimpleFactory&
 factory) {
-    factory.register_alternative_function(
-            "variance_samp", 
create_aggregate_function_variance_samp_older<false, false>, false,
-            AGG_FUNCTION_NULLABLE);
-    factory.register_alternative_function(
-            "variance_samp", 
create_aggregate_function_variance_samp_older<false, true>, true,
-            AGG_FUNCTION_NULLABLE);
-    factory.register_alternative_function("stddev_samp",
-                                          
create_aggregate_function_stddev_samp_older<true, false>,
-                                          false, AGG_FUNCTION_NULLABLE);
-    factory.register_alternative_function("stddev_samp",
-                                          
create_aggregate_function_stddev_samp_older<true, true>,
-                                          true, AGG_FUNCTION_NULLABLE);
+    
BeExecVersionManager::registe_restrict_function_compatibility("variance_samp");
+    
BeExecVersionManager::registe_restrict_function_compatibility("stddev_samp");
 }
 
 void 
register_aggregate_function_stddev_variance_samp(AggregateFunctionSimpleFactory&
 factory) {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_stddev.h 
b/be/src/vec/aggregate_functions/aggregate_function_stddev.h
index 5a8c896e1c3..bcd35b13149 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_stddev.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_stddev.h
@@ -171,30 +171,6 @@ struct StddevSampName : Data {
     static const char* name() { return "stddev_samp"; }
 };
 
-template <typename T, typename Data>
-struct SampData_OLDER : Data {
-    using ColVecResult =
-            std::conditional_t<IsDecimalNumber<T>, 
ColumnDecimal<Decimal128V2>, ColumnFloat64>;
-    void insert_result_into(IColumn& to) const {
-        ColumnNullable& nullable_column = assert_cast<ColumnNullable&>(to);
-        if (this->count == 1 || this->count == 0) {
-            nullable_column.insert_default();
-        } else {
-            auto& col = 
assert_cast<ColVecResult&>(nullable_column.get_nested_column());
-            if constexpr (IsDecimalNumber<T>) {
-                col.get_data().push_back(this->get_samp_result().value());
-            } else {
-                col.get_data().push_back(this->get_samp_result());
-            }
-            nullable_column.get_null_map_data().push_back(0);
-        }
-    }
-
-    static DataTypePtr get_return_type() {
-        return make_nullable(std::make_shared<DataTypeNumber<Float64>>());
-    }
-};
-
 template <typename T, typename Data>
 struct SampData : Data {
     using ColVecResult =
@@ -266,16 +242,6 @@ public:
     }
 };
 
-//samp function it's always nullables, it's need to handle nullable column
-//so return type and add function should processing null values
-template <typename Data, bool is_nullable>
-class AggregateFunctionSamp_OLDER final
-        : public AggregateFunctionSampVariance<false, Data, is_nullable> {
-public:
-    AggregateFunctionSamp_OLDER(const DataTypes& argument_types_)
-            : AggregateFunctionSampVariance<false, Data, 
is_nullable>(argument_types_) {}
-};
-
 template <typename Data, bool is_nullable>
 class AggregateFunctionSamp final : public 
AggregateFunctionSampVariance<false, Data, is_nullable> {
 public:
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp
index 598c23eb147..c16121bad73 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp
@@ -51,36 +51,10 @@ AggregateFunctionPtr 
create_aggregate_function_window_funnel(const std::string&
     }
 }
 
-AggregateFunctionPtr create_aggregate_function_window_funnel_old(const 
std::string& name,
-                                                                 const 
DataTypes& argument_types,
-                                                                 const bool 
result_is_nullable) {
-    if (argument_types.size() < 3) {
-        LOG(WARNING) << "window_funnel's argument less than 3.";
-        return nullptr;
-    }
-    if (WhichDataType(remove_nullable(argument_types[2])).is_date_time_v2()) {
-        return creator_without_type::create<
-                
AggregateFunctionWindowFunnelOld<DateV2Value<DateTimeV2ValueType>, UInt64>>(
-                argument_types, result_is_nullable);
-    } else if 
(WhichDataType(remove_nullable(argument_types[2])).is_date_time()) {
-        return creator_without_type::create<
-                AggregateFunctionWindowFunnelOld<VecDateTimeValue, 
Int64>>(argument_types,
-                                                                           
result_is_nullable);
-    } else {
-        LOG(WARNING) << "Only support DateTime type as window argument!";
-        return nullptr;
-    }
-}
-
 void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& 
factory) {
     factory.register_function_both("window_funnel", 
create_aggregate_function_window_funnel);
 }
 void 
register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory& 
factory) {
-    factory.register_alternative_function("window_funnel",
-                                          
create_aggregate_function_window_funnel_old, true,
-                                          AGG_FUNCTION_NEW_WINDOW_FUNNEL);
-    factory.register_alternative_function("window_funnel",
-                                          
create_aggregate_function_window_funnel_old, false,
-                                          AGG_FUNCTION_NEW_WINDOW_FUNNEL);
+    
BeExecVersionManager::registe_restrict_function_compatibility("window_funnel");
 }
 } // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h 
b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
index c0f0a4e7e20..84222f0d01b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.h
@@ -423,265 +423,4 @@ protected:
     using IAggregateFunction::version;
 };
 
-template <typename DateValueType, typename NativeType>
-struct WindowFunnelStateOld {
-    std::vector<std::pair<DateValueType, int>> events;
-    int max_event_level;
-    bool sorted;
-    int64_t window;
-    WindowFunnelMode window_funnel_mode;
-    bool enable_mode;
-
-    WindowFunnelStateOld() {
-        sorted = true;
-        max_event_level = 0;
-        window = 0;
-        window_funnel_mode = WindowFunnelMode::INVALID;
-    }
-
-    void reset() {
-        sorted = true;
-        max_event_level = 0;
-        window = 0;
-        events.shrink_to_fit();
-    }
-
-    void add(const DateValueType& timestamp, int event_idx, int event_num, 
int64_t win,
-             WindowFunnelMode mode) {
-        window = win;
-        max_event_level = event_num;
-        window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
-
-        if (sorted && events.size() > 0) {
-            if (events.back().first == timestamp) {
-                sorted = events.back().second <= event_idx;
-            } else {
-                sorted = events.back().first < timestamp;
-            }
-        }
-        events.emplace_back(timestamp, event_idx);
-    }
-
-    void sort() {
-        if (sorted) {
-            return;
-        }
-        std::stable_sort(events.begin(), events.end());
-    }
-
-    int get() const {
-        if (max_event_level == 0) {
-            return 0;
-        }
-        std::vector<std::optional<std::pair<DateValueType, DateValueType>>> 
events_timestamp(
-                max_event_level);
-        bool is_first_set = false;
-        for (int64_t i = 0; i < events.size(); i++) {
-            const int& event_idx = events[i].second;
-            const DateValueType& timestamp = events[i].first;
-            if (event_idx == 0) {
-                events_timestamp[0] = {timestamp, timestamp};
-                is_first_set = true;
-                continue;
-            }
-            if (window_funnel_mode == WindowFunnelMode::DEDUPLICATION &&
-                events_timestamp[event_idx].has_value()) {
-                break;
-            }
-            if (events_timestamp[event_idx - 1].has_value()) {
-                const DateValueType& first_timestamp =
-                        events_timestamp[event_idx - 1].value().first;
-                DateValueType last_timestamp = first_timestamp;
-                TimeInterval interval(SECOND, window, false);
-                last_timestamp.template date_add_interval<SECOND>(interval);
-
-                if (window_funnel_mode != WindowFunnelMode::INCREASE) {
-                    if (timestamp <= last_timestamp) {
-                        events_timestamp[event_idx] = {first_timestamp, 
timestamp};
-                        if (event_idx + 1 == max_event_level) {
-                            // Usually, max event level is small.
-                            return max_event_level;
-                        }
-                    }
-                } else {
-                    if (timestamp <= last_timestamp &&
-                        events_timestamp[event_idx - 1].value().second < 
timestamp) {
-                        if (!events_timestamp[event_idx].has_value() ||
-                            events_timestamp[event_idx].value().second > 
timestamp) {
-                            events_timestamp[event_idx] = {first_timestamp, 
timestamp};
-                        }
-                        if (event_idx + 1 == max_event_level) {
-                            // Usually, max event level is small.
-                            return max_event_level;
-                        }
-                    }
-                }
-            } else {
-                if (is_first_set && window_funnel_mode == 
WindowFunnelMode::FIXED) {
-                    for (size_t i = 0; i < events_timestamp.size(); i++) {
-                        if (!events_timestamp[i].has_value()) {
-                            return i;
-                        }
-                    }
-                }
-            }
-        }
-
-        for (int64_t i = events_timestamp.size() - 1; i >= 0; i--) {
-            if (events_timestamp[i].has_value()) {
-                return i + 1;
-            }
-        }
-
-        return 0;
-    }
-
-    void merge(const WindowFunnelStateOld& other) {
-        if (other.events.empty()) {
-            return;
-        }
-
-        int64_t orig_size = events.size();
-        events.insert(std::end(events), std::begin(other.events), 
std::end(other.events));
-        const auto begin = std::begin(events);
-        const auto middle = std::next(events.begin(), orig_size);
-        const auto end = std::end(events);
-        if (!other.sorted) {
-            std::stable_sort(middle, end);
-        }
-
-        if (!sorted) {
-            std::stable_sort(begin, middle);
-        }
-        std::inplace_merge(begin, middle, end);
-        max_event_level = max_event_level > 0 ? max_event_level : 
other.max_event_level;
-        window = window > 0 ? window : other.window;
-        if (enable_mode) {
-            window_funnel_mode = window_funnel_mode == 
WindowFunnelMode::INVALID
-                                         ? other.window_funnel_mode
-                                         : window_funnel_mode;
-        } else {
-            window_funnel_mode = WindowFunnelMode::DEFAULT;
-        }
-        sorted = true;
-    }
-
-    void write(BufferWritable& out) const {
-        write_var_int(max_event_level, out);
-        write_var_int(window, out);
-        if (enable_mode) {
-            
write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
-                          out);
-        }
-        write_var_int(events.size(), out);
-
-        for (int64_t i = 0; i < events.size(); i++) {
-            int64_t timestamp = binary_cast<DateValueType, 
NativeType>(events[i].first);
-            int event_idx = events[i].second;
-            write_var_int(timestamp, out);
-            write_var_int(event_idx, out);
-        }
-    }
-
-    void read(BufferReadable& in) {
-        int64_t event_level;
-        read_var_int(event_level, in);
-        max_event_level = (int)event_level;
-        read_var_int(window, in);
-        window_funnel_mode = WindowFunnelMode::DEFAULT;
-        if (enable_mode) {
-            int64_t mode;
-            read_var_int(mode, in);
-            window_funnel_mode = static_cast<WindowFunnelMode>(mode);
-        }
-        int64_t size = 0;
-        read_var_int(size, in);
-        for (int64_t i = 0; i < size; i++) {
-            int64_t timestamp;
-            int64_t event_idx;
-
-            read_var_int(timestamp, in);
-            read_var_int(event_idx, in);
-            DateValueType time_value = binary_cast<NativeType, 
DateValueType>(timestamp);
-            add(time_value, (int)event_idx, max_event_level, window, 
window_funnel_mode);
-        }
-    }
-};
-
-template <typename DateValueType, typename NativeType>
-class AggregateFunctionWindowFunnelOld
-        : public IAggregateFunctionDataHelper<
-                  WindowFunnelStateOld<DateValueType, NativeType>,
-                  AggregateFunctionWindowFunnelOld<DateValueType, NativeType>> 
{
-public:
-    AggregateFunctionWindowFunnelOld(const DataTypes& argument_types_)
-            : IAggregateFunctionDataHelper<
-                      WindowFunnelStateOld<DateValueType, NativeType>,
-                      AggregateFunctionWindowFunnelOld<DateValueType, 
NativeType>>(
-                      argument_types_) {}
-
-    void create(AggregateDataPtr __restrict place) const override {
-        auto data = new (place) WindowFunnelStateOld<DateValueType, 
NativeType>();
-        /// support window funnel mode from 2.0. See 
`BeExecVersionManager::max_be_exec_version`
-        data->enable_mode = version >= 3;
-    }
-
-    String get_name() const override { return "window_funnel"; }
-
-    DataTypePtr get_return_type() const override { return 
std::make_shared<DataTypeInt32>(); }
-
-    void reset(AggregateDataPtr __restrict place) const override { 
this->data(place).reset(); }
-
-    void add(AggregateDataPtr __restrict place, const IColumn** columns, 
ssize_t row_num,
-             Arena*) const override {
-        const auto& window =
-                assert_cast<const ColumnVector<Int64>&, 
TypeCheckOnRelease::DISABLE>(*columns[0])
-                        .get_data()[row_num];
-        StringRef mode = columns[1]->get_data_at(row_num);
-        const auto& timestamp =
-                assert_cast<const ColumnVector<NativeType>&, 
TypeCheckOnRelease::DISABLE>(
-                        *columns[2])
-                        .get_data()[row_num];
-        const int NON_EVENT_NUM = 3;
-        for (int i = NON_EVENT_NUM; i < 
IAggregateFunction::get_argument_types().size(); i++) {
-            const auto& is_set =
-                    assert_cast<const ColumnVector<UInt8>&, 
TypeCheckOnRelease::DISABLE>(
-                            *columns[i])
-                            .get_data()[row_num];
-            if (is_set) {
-                this->data(place).add(
-                        binary_cast<NativeType, DateValueType>(timestamp), i - 
NON_EVENT_NUM,
-                        IAggregateFunction::get_argument_types().size() - 
NON_EVENT_NUM, window,
-                        string_to_window_funnel_mode(mode.to_string()));
-            }
-        }
-    }
-
-    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
-               Arena*) const override {
-        this->data(place).merge(this->data(rhs));
-    }
-
-    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
-        this->data(place).write(buf);
-    }
-
-    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
-                     Arena*) const override {
-        this->data(place).read(buf);
-    }
-
-    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
-        this->data(const_cast<AggregateDataPtr>(place)).sort();
-        assert_cast<ColumnInt32&>(to).get_data().push_back(
-                IAggregateFunctionDataHelper<
-                        WindowFunnelStateOld<DateValueType, NativeType>,
-                        AggregateFunctionWindowFunnelOld<DateValueType, 
NativeType>>::data(place)
-                        .get());
-    }
-
-protected:
-    using IAggregateFunction::version;
-};
-
 } // namespace doris::vectorized
diff --git a/be/test/vec/aggregate_functions/vec_window_funnel_test.cpp 
b/be/test/vec/aggregate_functions/vec_window_funnel_test.cpp
index ed019ef699b..f0c93c4beab 100644
--- a/be/test/vec/aggregate_functions/vec_window_funnel_test.cpp
+++ b/be/test/vec/aggregate_functions/vec_window_funnel_test.cpp
@@ -58,7 +58,8 @@ public:
                 std::make_shared<DataTypeDateTime>(), 
std::make_shared<DataTypeUInt8>(),
                 std::make_shared<DataTypeUInt8>(),    
std::make_shared<DataTypeUInt8>(),
                 std::make_shared<DataTypeUInt8>()};
-        agg_function = factory.get("window_funnel", data_types, false, -1);
+        agg_function = factory.get("window_funnel", data_types, false,
+                                   BeExecVersionManager::get_newest_version());
         EXPECT_NE(agg_function, nullptr);
     }
 
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index dd017b3c023..b28baaecba5 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1877,7 +1877,7 @@ public class Config extends ConfigBase {
      * Max data version of backends serialize block.
      */
     @ConfField(mutable = false)
-    public static int max_be_exec_version = 7;
+    public static int max_be_exec_version = 8;
 
     /**
      * Min data version of backends serialize block.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to