github-actions[bot] commented on code in PR #40001:
URL: https://github.com/apache/doris/pull/40001#discussion_r1732970519


##########
be/src/vec/aggregate_functions/aggregate_function_window_funnel.h:
##########
@@ -21,25 +21,29 @@
 
 #pragma once
 
-#include <stddef.h>
-#include <stdint.h>
+#include <gen_cpp/data.pb.h>

Review Comment:
   warning: 'gen_cpp/data.pb.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/data.pb.h>
            ^
   ```
   



##########
be/src/vec/aggregate_functions/aggregate_function_window_funnel.h:
##########
@@ -71,16 +75,367 @@
     }
 }
 
-template <typename DateValueType, typename NativeType>
+template <TypeIndex TYPE_INDEX, typename NativeType>
 struct WindowFunnelState {
+    using DateValueType = std::conditional_t<TYPE_INDEX == 
TypeIndex::DateTimeV2,
+                                             DateV2Value<DateTimeV2ValueType>, 
VecDateTimeValue>;
+    int event_count = 0;
+    int64_t window;
+    bool enable_mode;
+    WindowFunnelMode window_funnel_mode;
+    mutable vectorized::MutableBlock mutable_block;
+    ColumnVector<NativeType>::Container* timestamp_column_data;
+    std::vector<ColumnVector<UInt8>::Container*> event_columns_datas;
+    SortDescription sort_description {1};
+    bool sorted;
+
+    WindowFunnelState() {
+        event_count = 0;
+        window = 0;
+        window_funnel_mode = WindowFunnelMode::INVALID;
+
+        sort_description[0].column_number = 0;
+        sort_description[0].direction = 1;
+        sort_description[0].nulls_direction = -1;
+        sorted = false;
+    }
+    WindowFunnelState(int arg_event_count) : WindowFunnelState() {
+        event_count = arg_event_count;
+        auto timestamp_column = ColumnVector<NativeType>::create();
+        timestamp_column_data =
+                
&assert_cast<ColumnVector<NativeType>&>(*timestamp_column).get_data();
+
+        MutableColumns event_columns;
+        for (int i = 0; i < event_count; i++) {
+            auto event_column = ColumnVector<UInt8>::create();
+            event_columns_datas.emplace_back(
+                    
&assert_cast<ColumnVector<UInt8>&>(*event_column).get_data());
+            event_columns.emplace_back(std::move(event_column));
+        }
+        Block tmp_block;
+        tmp_block.insert({std::move(timestamp_column),
+                          
DataTypeFactory::instance().create_data_type(TYPE_INDEX), "timestamp"});
+        for (int i = 0; i < event_count; i++) {
+            tmp_block.insert({std::move(event_columns[i]),
+                              
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
+                              "event_" + std::to_string(i)});
+        }
+
+        mutable_block = MutableBlock(std::move(tmp_block));
+    }
+
+    void reset() {
+        window = 0;
+        mutable_block.clear();
+        timestamp_column_data = nullptr;
+        event_columns_datas.clear();
+        sorted = false;
+    }
+
+    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, 
WindowFunnelMode mode) {
+        window = win;
+        window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
+
+        timestamp_column_data->push_back(
+                assert_cast<const 
ColumnVector<NativeType>&>(*arg_columns[2]).get_data()[row_num]);
+        for (int i = 0; i < event_count; i++) {
+            event_columns_datas[i]->push_back(
+                    assert_cast<const ColumnVector<UInt8>&>(*arg_columns[3 + 
i])
+                            .get_data()[row_num]);
+        }
+    }
+
+    void sort() {
+        if (sorted) {
+            return;
+        }
+
+        Block tmp_block = mutable_block.to_block();
+        auto block = tmp_block.clone_without_columns();
+        sort_block(tmp_block, block, sort_description, 0);
+        mutable_block = MutableBlock(std::move(block));
+        sorted = true;
+    }
+
+    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
+    int _match_event_list(size_t& start_row, size_t row_count,
+                          const NativeType* timestamp_data) const {
+        int matched_count = 0;
+        DateValueType start_timestamp;
+        DateValueType end_timestamp;
+        TimeInterval interval(SECOND, window, false);
+
+        int column_idx = 1;
+        const auto& first_event_column = 
mutable_block.get_column_by_position(column_idx);
+        const auto& first_event_data =
+                assert_cast<const 
ColumnVector<UInt8>&>(*first_event_column).get_data();
+        auto match_row = simd::find_one(first_event_data.data(), start_row, 
row_count);
+        start_row = match_row + 1;
+        if (match_row < row_count) {
+            auto prev_timestamp = binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+            end_timestamp = prev_timestamp;
+            end_timestamp.template date_add_interval<SECOND>(interval);
+
+            matched_count++;
+
+            column_idx++;
+            auto last_match_row = match_row;
+            for (; column_idx < event_count + 1; column_idx++) {
+                const auto& event_column = 
mutable_block.get_column_by_position(column_idx);
+                const auto& event_data =
+                        assert_cast<const 
ColumnVector<UInt8>&>(*event_column).get_data();
+                if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
+                    ++match_row;
+                    if (event_data[match_row] == 1) {
+                        auto current_timestamp =
+                                binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                        if (current_timestamp <= end_timestamp) {
+                            matched_count++;
+                            continue;
+                        }
+                    }
+                    break;
+                }
+                match_row = simd::find_one(event_data.data(), match_row + 1, 
row_count);
+                if (match_row < row_count) {
+                    auto current_timestamp =
+                            binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                    bool is_matched = current_timestamp <= end_timestamp;
+                    if (is_matched) {
+                        if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::INCREASE) {
+                            is_matched = current_timestamp > prev_timestamp;
+                        }
+                    }
+                    if (!is_matched) {
+                        break;
+                    }
+                    if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::INCREASE) {
+                        prev_timestamp =
+                                binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                    }
+                    if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::DEDUPLICATION) {
+                        bool is_dup = false;
+                        if (match_row != last_match_row + 1) {
+                            for (int tmp_column_idx = 1; tmp_column_idx < 
column_idx;
+                                 tmp_column_idx++) {
+                                const auto& tmp_event_column =
+                                        
mutable_block.get_column_by_position(tmp_column_idx);
+                                const auto& tmp_event_data =
+                                        assert_cast<const 
ColumnVector<UInt8>&>(*tmp_event_column)
+                                                .get_data();
+                                auto dup_match_row = 
simd::find_one(tmp_event_data.data(),
+                                                                    
last_match_row + 1, match_row);
+                                if (dup_match_row < match_row) {
+                                    is_dup = true;
+                                    break;
+                                }
+                            }
+                        }
+                        if (is_dup) {
+                            break;
+                        }
+                        last_match_row = match_row;
+                    }
+                    matched_count++;
+                } else {
+                    break;
+                }
+            }
+        }
+        return matched_count;
+    }
+
+    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
+    int _get_internal() const {

Review Comment:
   warning: function '_get_internal' should be marked [[nodiscard]] 
[modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] int _get_internal() const {
   ```
   



##########
be/src/vec/aggregate_functions/aggregate_function_window_funnel.h:
##########
@@ -71,16 +75,367 @@
     }
 }
 
-template <typename DateValueType, typename NativeType>
+template <TypeIndex TYPE_INDEX, typename NativeType>
 struct WindowFunnelState {
+    using DateValueType = std::conditional_t<TYPE_INDEX == 
TypeIndex::DateTimeV2,
+                                             DateV2Value<DateTimeV2ValueType>, 
VecDateTimeValue>;
+    int event_count = 0;
+    int64_t window;
+    bool enable_mode;
+    WindowFunnelMode window_funnel_mode;
+    mutable vectorized::MutableBlock mutable_block;
+    ColumnVector<NativeType>::Container* timestamp_column_data;
+    std::vector<ColumnVector<UInt8>::Container*> event_columns_datas;
+    SortDescription sort_description {1};
+    bool sorted;
+
+    WindowFunnelState() {
+        event_count = 0;
+        window = 0;
+        window_funnel_mode = WindowFunnelMode::INVALID;
+
+        sort_description[0].column_number = 0;
+        sort_description[0].direction = 1;
+        sort_description[0].nulls_direction = -1;
+        sorted = false;
+    }
+    WindowFunnelState(int arg_event_count) : WindowFunnelState() {
+        event_count = arg_event_count;
+        auto timestamp_column = ColumnVector<NativeType>::create();
+        timestamp_column_data =
+                
&assert_cast<ColumnVector<NativeType>&>(*timestamp_column).get_data();
+
+        MutableColumns event_columns;
+        for (int i = 0; i < event_count; i++) {
+            auto event_column = ColumnVector<UInt8>::create();
+            event_columns_datas.emplace_back(
+                    
&assert_cast<ColumnVector<UInt8>&>(*event_column).get_data());
+            event_columns.emplace_back(std::move(event_column));
+        }
+        Block tmp_block;
+        tmp_block.insert({std::move(timestamp_column),
+                          
DataTypeFactory::instance().create_data_type(TYPE_INDEX), "timestamp"});
+        for (int i = 0; i < event_count; i++) {
+            tmp_block.insert({std::move(event_columns[i]),
+                              
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
+                              "event_" + std::to_string(i)});
+        }
+
+        mutable_block = MutableBlock(std::move(tmp_block));
+    }
+
+    void reset() {
+        window = 0;
+        mutable_block.clear();
+        timestamp_column_data = nullptr;
+        event_columns_datas.clear();
+        sorted = false;
+    }
+
+    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, 
WindowFunnelMode mode) {
+        window = win;
+        window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
+
+        timestamp_column_data->push_back(
+                assert_cast<const 
ColumnVector<NativeType>&>(*arg_columns[2]).get_data()[row_num]);
+        for (int i = 0; i < event_count; i++) {
+            event_columns_datas[i]->push_back(
+                    assert_cast<const ColumnVector<UInt8>&>(*arg_columns[3 + 
i])
+                            .get_data()[row_num]);
+        }
+    }
+
+    void sort() {
+        if (sorted) {
+            return;
+        }
+
+        Block tmp_block = mutable_block.to_block();
+        auto block = tmp_block.clone_without_columns();
+        sort_block(tmp_block, block, sort_description, 0);
+        mutable_block = MutableBlock(std::move(block));
+        sorted = true;
+    }
+
+    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
+    int _match_event_list(size_t& start_row, size_t row_count,
+                          const NativeType* timestamp_data) const {
+        int matched_count = 0;
+        DateValueType start_timestamp;
+        DateValueType end_timestamp;
+        TimeInterval interval(SECOND, window, false);
+
+        int column_idx = 1;
+        const auto& first_event_column = 
mutable_block.get_column_by_position(column_idx);
+        const auto& first_event_data =
+                assert_cast<const 
ColumnVector<UInt8>&>(*first_event_column).get_data();
+        auto match_row = simd::find_one(first_event_data.data(), start_row, 
row_count);
+        start_row = match_row + 1;
+        if (match_row < row_count) {
+            auto prev_timestamp = binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+            end_timestamp = prev_timestamp;
+            end_timestamp.template date_add_interval<SECOND>(interval);
+
+            matched_count++;
+
+            column_idx++;
+            auto last_match_row = match_row;
+            for (; column_idx < event_count + 1; column_idx++) {
+                const auto& event_column = 
mutable_block.get_column_by_position(column_idx);
+                const auto& event_data =
+                        assert_cast<const 
ColumnVector<UInt8>&>(*event_column).get_data();
+                if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
+                    ++match_row;
+                    if (event_data[match_row] == 1) {
+                        auto current_timestamp =
+                                binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                        if (current_timestamp <= end_timestamp) {
+                            matched_count++;
+                            continue;
+                        }
+                    }
+                    break;
+                }
+                match_row = simd::find_one(event_data.data(), match_row + 1, 
row_count);
+                if (match_row < row_count) {
+                    auto current_timestamp =
+                            binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                    bool is_matched = current_timestamp <= end_timestamp;
+                    if (is_matched) {
+                        if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::INCREASE) {
+                            is_matched = current_timestamp > prev_timestamp;
+                        }
+                    }
+                    if (!is_matched) {
+                        break;
+                    }
+                    if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::INCREASE) {
+                        prev_timestamp =
+                                binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                    }
+                    if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::DEDUPLICATION) {
+                        bool is_dup = false;
+                        if (match_row != last_match_row + 1) {
+                            for (int tmp_column_idx = 1; tmp_column_idx < 
column_idx;
+                                 tmp_column_idx++) {
+                                const auto& tmp_event_column =
+                                        
mutable_block.get_column_by_position(tmp_column_idx);
+                                const auto& tmp_event_data =
+                                        assert_cast<const 
ColumnVector<UInt8>&>(*tmp_event_column)
+                                                .get_data();
+                                auto dup_match_row = 
simd::find_one(tmp_event_data.data(),
+                                                                    
last_match_row + 1, match_row);
+                                if (dup_match_row < match_row) {
+                                    is_dup = true;
+                                    break;
+                                }
+                            }
+                        }
+                        if (is_dup) {
+                            break;
+                        }
+                        last_match_row = match_row;
+                    }
+                    matched_count++;
+                } else {
+                    break;
+                }
+            }
+        }
+        return matched_count;
+    }
+
+    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
+    int _get_internal() const {
+        size_t start_row = 0;
+        int max_found_event_count = 0;
+        const auto& ts_column = 
mutable_block.get_column_by_position(0)->get_ptr();
+        const auto& timestamp_data =
+                assert_cast<const 
ColumnVector<NativeType>&>(*ts_column).get_data().data();
+
+        auto row_count = mutable_block.rows();
+        while (start_row < row_count) {
+            auto found_event_count =
+                    _match_event_list<WINDOW_FUNNEL_MODE>(start_row, 
row_count, timestamp_data);
+            if (found_event_count == event_count) {
+                return found_event_count;
+            }
+            max_found_event_count = std::max(max_found_event_count, 
found_event_count);
+        }
+        return max_found_event_count;
+    }
+    int get() const {

Review Comment:
   warning: function 'get' should be marked [[nodiscard]] 
[modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] int get() const {
   ```
   



##########
be/src/vec/aggregate_functions/aggregate_function_window_funnel.h:
##########
@@ -71,16 +75,367 @@
     }
 }
 
-template <typename DateValueType, typename NativeType>
+template <TypeIndex TYPE_INDEX, typename NativeType>
 struct WindowFunnelState {
+    using DateValueType = std::conditional_t<TYPE_INDEX == 
TypeIndex::DateTimeV2,
+                                             DateV2Value<DateTimeV2ValueType>, 
VecDateTimeValue>;
+    int event_count = 0;
+    int64_t window;
+    bool enable_mode;
+    WindowFunnelMode window_funnel_mode;
+    mutable vectorized::MutableBlock mutable_block;
+    ColumnVector<NativeType>::Container* timestamp_column_data;
+    std::vector<ColumnVector<UInt8>::Container*> event_columns_datas;
+    SortDescription sort_description {1};
+    bool sorted;
+
+    WindowFunnelState() {
+        event_count = 0;
+        window = 0;
+        window_funnel_mode = WindowFunnelMode::INVALID;
+
+        sort_description[0].column_number = 0;
+        sort_description[0].direction = 1;
+        sort_description[0].nulls_direction = -1;
+        sorted = false;
+    }
+    WindowFunnelState(int arg_event_count) : WindowFunnelState() {
+        event_count = arg_event_count;
+        auto timestamp_column = ColumnVector<NativeType>::create();
+        timestamp_column_data =
+                
&assert_cast<ColumnVector<NativeType>&>(*timestamp_column).get_data();
+
+        MutableColumns event_columns;
+        for (int i = 0; i < event_count; i++) {
+            auto event_column = ColumnVector<UInt8>::create();
+            event_columns_datas.emplace_back(
+                    
&assert_cast<ColumnVector<UInt8>&>(*event_column).get_data());
+            event_columns.emplace_back(std::move(event_column));
+        }
+        Block tmp_block;
+        tmp_block.insert({std::move(timestamp_column),
+                          
DataTypeFactory::instance().create_data_type(TYPE_INDEX), "timestamp"});
+        for (int i = 0; i < event_count; i++) {
+            tmp_block.insert({std::move(event_columns[i]),
+                              
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
+                              "event_" + std::to_string(i)});
+        }
+
+        mutable_block = MutableBlock(std::move(tmp_block));
+    }
+
+    void reset() {
+        window = 0;
+        mutable_block.clear();
+        timestamp_column_data = nullptr;
+        event_columns_datas.clear();
+        sorted = false;
+    }
+
+    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, 
WindowFunnelMode mode) {
+        window = win;
+        window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
+
+        timestamp_column_data->push_back(
+                assert_cast<const 
ColumnVector<NativeType>&>(*arg_columns[2]).get_data()[row_num]);
+        for (int i = 0; i < event_count; i++) {
+            event_columns_datas[i]->push_back(
+                    assert_cast<const ColumnVector<UInt8>&>(*arg_columns[3 + 
i])
+                            .get_data()[row_num]);
+        }
+    }
+
+    void sort() {
+        if (sorted) {
+            return;
+        }
+
+        Block tmp_block = mutable_block.to_block();
+        auto block = tmp_block.clone_without_columns();
+        sort_block(tmp_block, block, sort_description, 0);
+        mutable_block = MutableBlock(std::move(block));
+        sorted = true;
+    }
+
+    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
+    int _match_event_list(size_t& start_row, size_t row_count,
+                          const NativeType* timestamp_data) const {
+        int matched_count = 0;
+        DateValueType start_timestamp;
+        DateValueType end_timestamp;
+        TimeInterval interval(SECOND, window, false);
+
+        int column_idx = 1;
+        const auto& first_event_column = 
mutable_block.get_column_by_position(column_idx);
+        const auto& first_event_data =
+                assert_cast<const 
ColumnVector<UInt8>&>(*first_event_column).get_data();
+        auto match_row = simd::find_one(first_event_data.data(), start_row, 
row_count);
+        start_row = match_row + 1;
+        if (match_row < row_count) {
+            auto prev_timestamp = binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+            end_timestamp = prev_timestamp;
+            end_timestamp.template date_add_interval<SECOND>(interval);
+
+            matched_count++;
+
+            column_idx++;
+            auto last_match_row = match_row;
+            for (; column_idx < event_count + 1; column_idx++) {
+                const auto& event_column = 
mutable_block.get_column_by_position(column_idx);
+                const auto& event_data =
+                        assert_cast<const 
ColumnVector<UInt8>&>(*event_column).get_data();
+                if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
+                    ++match_row;
+                    if (event_data[match_row] == 1) {
+                        auto current_timestamp =
+                                binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                        if (current_timestamp <= end_timestamp) {
+                            matched_count++;
+                            continue;
+                        }
+                    }
+                    break;
+                }
+                match_row = simd::find_one(event_data.data(), match_row + 1, 
row_count);
+                if (match_row < row_count) {
+                    auto current_timestamp =
+                            binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                    bool is_matched = current_timestamp <= end_timestamp;
+                    if (is_matched) {
+                        if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::INCREASE) {
+                            is_matched = current_timestamp > prev_timestamp;
+                        }
+                    }
+                    if (!is_matched) {
+                        break;
+                    }
+                    if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::INCREASE) {
+                        prev_timestamp =
+                                binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                    }
+                    if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::DEDUPLICATION) {
+                        bool is_dup = false;
+                        if (match_row != last_match_row + 1) {
+                            for (int tmp_column_idx = 1; tmp_column_idx < 
column_idx;
+                                 tmp_column_idx++) {
+                                const auto& tmp_event_column =
+                                        
mutable_block.get_column_by_position(tmp_column_idx);
+                                const auto& tmp_event_data =
+                                        assert_cast<const 
ColumnVector<UInt8>&>(*tmp_event_column)
+                                                .get_data();
+                                auto dup_match_row = 
simd::find_one(tmp_event_data.data(),
+                                                                    
last_match_row + 1, match_row);
+                                if (dup_match_row < match_row) {
+                                    is_dup = true;
+                                    break;
+                                }
+                            }
+                        }
+                        if (is_dup) {
+                            break;
+                        }
+                        last_match_row = match_row;
+                    }
+                    matched_count++;
+                } else {
+                    break;
+                }
+            }
+        }
+        return matched_count;
+    }
+
+    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
+    int _get_internal() const {
+        size_t start_row = 0;
+        int max_found_event_count = 0;
+        const auto& ts_column = 
mutable_block.get_column_by_position(0)->get_ptr();
+        const auto& timestamp_data =
+                assert_cast<const 
ColumnVector<NativeType>&>(*ts_column).get_data().data();
+
+        auto row_count = mutable_block.rows();
+        while (start_row < row_count) {
+            auto found_event_count =
+                    _match_event_list<WINDOW_FUNNEL_MODE>(start_row, 
row_count, timestamp_data);
+            if (found_event_count == event_count) {
+                return found_event_count;
+            }
+            max_found_event_count = std::max(max_found_event_count, 
found_event_count);
+        }
+        return max_found_event_count;
+    }
+    int get() const {
+        auto row_count = mutable_block.rows();
+        if (event_count == 0 || row_count == 0) {
+            return 0;
+        }
+        switch (window_funnel_mode) {
+        case WindowFunnelMode::DEFAULT:
+            return _get_internal<WindowFunnelMode::DEFAULT>();
+        case WindowFunnelMode::DEDUPLICATION:
+            return _get_internal<WindowFunnelMode::DEDUPLICATION>();
+        case WindowFunnelMode::FIXED:
+            return _get_internal<WindowFunnelMode::FIXED>();
+        case WindowFunnelMode::INCREASE:
+            return _get_internal<WindowFunnelMode::INCREASE>();
+        default:
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid 
window_funnel mode");
+            return 0;
+        }
+    }
+
+    void merge(const WindowFunnelState& other) {
+        if (!other.mutable_block.empty()) {
+            auto st = mutable_block.merge(other.mutable_block.to_block());
+            if (!st.ok()) {
+                throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
st.to_string());
+                return;
+            }
+        }
+
+        event_count = event_count > 0 ? event_count : other.event_count;
+        window = window > 0 ? window : other.window;
+        if (enable_mode) {
+            window_funnel_mode = window_funnel_mode == 
WindowFunnelMode::INVALID
+                                         ? other.window_funnel_mode
+                                         : window_funnel_mode;
+        } else {
+            window_funnel_mode = WindowFunnelMode::DEFAULT;
+        }
+    }
+
+    void write(BufferWritable& out) const {
+        write_var_int(event_count, out);
+        write_var_int(window, out);
+        if (enable_mode) {
+            
write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
+                          out);
+        }
+        PBlock pblock;
+        size_t uncompressed_bytes = 0;
+        size_t compressed_bytes = 0;
+        Status status;
+        std::string buff;
+        Block block = mutable_block.to_block();
+        status = block.serialize(
+                BeExecVersionManager::get_newest_version(), &pblock, 
&uncompressed_bytes,
+                &compressed_bytes,
+                segment_v2::CompressionTypePB::ZSTD); // ZSTD for better 
compression ratio
+        if (!status.ok()) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
status.to_string());
+            return;
+        }
+        if (!pblock.SerializeToString(&buff)) {
+            throw doris::Exception(ErrorCode::SERIALIZE_PROTOBUF_ERROR,
+                                   "Serialize window_funnel data");
+            return;
+        }
+        auto data_bytes = buff.size();
+        write_var_uint(data_bytes, out);
+        out.write(buff.data(), data_bytes);
+    }
+
+    void read(BufferReadable& in) {
+        int64_t event_level;
+        read_var_int(event_level, in);
+        event_count = (int)event_level;
+        read_var_int(window, in);
+        window_funnel_mode = WindowFunnelMode::DEFAULT;
+        if (enable_mode) {
+            int64_t mode;
+            read_var_int(mode, in);
+            window_funnel_mode = static_cast<WindowFunnelMode>(mode);
+        }
+        size_t data_bytes = 0;
+        read_var_uint(data_bytes, in);
+        std::string buff;
+        buff.resize(data_bytes);
+        in.read(buff.data(), data_bytes);
+
+        PBlock pblock;
+        if (!pblock.ParseFromArray(buff.data(), data_bytes)) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                   "Failed to parse window_funnel data to 
block");
+        }
+        Block block;
+        auto status = block.deserialize(pblock);
+        if (!status.ok()) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
status.to_string());
+        }
+        mutable_block = MutableBlock(std::move(block));
+    }
+};
+
+template <TypeIndex TYPE_INDEX, typename NativeType>
+class AggregateFunctionWindowFunnel
+        : public IAggregateFunctionDataHelper<
+                  WindowFunnelState<TYPE_INDEX, NativeType>,
+                  AggregateFunctionWindowFunnel<TYPE_INDEX, NativeType>> {
+public:
+    AggregateFunctionWindowFunnel(const DataTypes& argument_types_)
+            : IAggregateFunctionDataHelper<WindowFunnelState<TYPE_INDEX, 
NativeType>,
+                                           
AggregateFunctionWindowFunnel<TYPE_INDEX, NativeType>>(
+                      argument_types_) {}
+
+    void create(AggregateDataPtr __restrict place) const override {
+        auto data = new (place) WindowFunnelState<TYPE_INDEX, NativeType>(
+                IAggregateFunction::get_argument_types().size() - 3);
+        /// support window funnel mode from 2.0. See 
`BeExecVersionManager::max_be_exec_version`
+        data->enable_mode = version >= 3;
+    }
+
+    String get_name() const override { return "window_funnel"; }
+
+    DataTypePtr get_return_type() const override { return 
std::make_shared<DataTypeInt32>(); }

Review Comment:
   warning: function 'get_return_type' should be marked [[nodiscard]] 
[modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] DataTypePtr get_return_type() const override { return 
std::make_shared<DataTypeInt32>(); }
   ```
   



##########
be/src/vec/aggregate_functions/aggregate_function_window_funnel.h:
##########
@@ -71,16 +75,367 @@
     }
 }
 
-template <typename DateValueType, typename NativeType>
+template <TypeIndex TYPE_INDEX, typename NativeType>
 struct WindowFunnelState {
+    using DateValueType = std::conditional_t<TYPE_INDEX == 
TypeIndex::DateTimeV2,
+                                             DateV2Value<DateTimeV2ValueType>, 
VecDateTimeValue>;
+    int event_count = 0;
+    int64_t window;
+    bool enable_mode;
+    WindowFunnelMode window_funnel_mode;
+    mutable vectorized::MutableBlock mutable_block;
+    ColumnVector<NativeType>::Container* timestamp_column_data;
+    std::vector<ColumnVector<UInt8>::Container*> event_columns_datas;
+    SortDescription sort_description {1};
+    bool sorted;
+
+    WindowFunnelState() {
+        event_count = 0;
+        window = 0;
+        window_funnel_mode = WindowFunnelMode::INVALID;
+
+        sort_description[0].column_number = 0;
+        sort_description[0].direction = 1;
+        sort_description[0].nulls_direction = -1;
+        sorted = false;
+    }
+    WindowFunnelState(int arg_event_count) : WindowFunnelState() {
+        event_count = arg_event_count;
+        auto timestamp_column = ColumnVector<NativeType>::create();
+        timestamp_column_data =
+                
&assert_cast<ColumnVector<NativeType>&>(*timestamp_column).get_data();
+
+        MutableColumns event_columns;
+        for (int i = 0; i < event_count; i++) {
+            auto event_column = ColumnVector<UInt8>::create();
+            event_columns_datas.emplace_back(
+                    
&assert_cast<ColumnVector<UInt8>&>(*event_column).get_data());
+            event_columns.emplace_back(std::move(event_column));
+        }
+        Block tmp_block;
+        tmp_block.insert({std::move(timestamp_column),
+                          
DataTypeFactory::instance().create_data_type(TYPE_INDEX), "timestamp"});
+        for (int i = 0; i < event_count; i++) {
+            tmp_block.insert({std::move(event_columns[i]),
+                              
DataTypeFactory::instance().create_data_type(TypeIndex::UInt8),
+                              "event_" + std::to_string(i)});
+        }
+
+        mutable_block = MutableBlock(std::move(tmp_block));
+    }
+
+    void reset() {
+        window = 0;
+        mutable_block.clear();
+        timestamp_column_data = nullptr;
+        event_columns_datas.clear();
+        sorted = false;
+    }
+
+    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, 
WindowFunnelMode mode) {
+        window = win;
+        window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
+
+        timestamp_column_data->push_back(
+                assert_cast<const 
ColumnVector<NativeType>&>(*arg_columns[2]).get_data()[row_num]);
+        for (int i = 0; i < event_count; i++) {
+            event_columns_datas[i]->push_back(
+                    assert_cast<const ColumnVector<UInt8>&>(*arg_columns[3 + 
i])
+                            .get_data()[row_num]);
+        }
+    }
+
+    void sort() {
+        if (sorted) {
+            return;
+        }
+
+        Block tmp_block = mutable_block.to_block();
+        auto block = tmp_block.clone_without_columns();
+        sort_block(tmp_block, block, sort_description, 0);
+        mutable_block = MutableBlock(std::move(block));
+        sorted = true;
+    }
+
+    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
+    int _match_event_list(size_t& start_row, size_t row_count,
+                          const NativeType* timestamp_data) const {
+        int matched_count = 0;
+        DateValueType start_timestamp;
+        DateValueType end_timestamp;
+        TimeInterval interval(SECOND, window, false);
+
+        int column_idx = 1;
+        const auto& first_event_column = 
mutable_block.get_column_by_position(column_idx);
+        const auto& first_event_data =
+                assert_cast<const 
ColumnVector<UInt8>&>(*first_event_column).get_data();
+        auto match_row = simd::find_one(first_event_data.data(), start_row, 
row_count);
+        start_row = match_row + 1;
+        if (match_row < row_count) {
+            auto prev_timestamp = binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+            end_timestamp = prev_timestamp;
+            end_timestamp.template date_add_interval<SECOND>(interval);
+
+            matched_count++;
+
+            column_idx++;
+            auto last_match_row = match_row;
+            for (; column_idx < event_count + 1; column_idx++) {
+                const auto& event_column = 
mutable_block.get_column_by_position(column_idx);
+                const auto& event_data =
+                        assert_cast<const 
ColumnVector<UInt8>&>(*event_column).get_data();
+                if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
+                    ++match_row;
+                    if (event_data[match_row] == 1) {
+                        auto current_timestamp =
+                                binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                        if (current_timestamp <= end_timestamp) {
+                            matched_count++;
+                            continue;
+                        }
+                    }
+                    break;
+                }
+                match_row = simd::find_one(event_data.data(), match_row + 1, 
row_count);
+                if (match_row < row_count) {
+                    auto current_timestamp =
+                            binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                    bool is_matched = current_timestamp <= end_timestamp;
+                    if (is_matched) {
+                        if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::INCREASE) {
+                            is_matched = current_timestamp > prev_timestamp;
+                        }
+                    }
+                    if (!is_matched) {
+                        break;
+                    }
+                    if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::INCREASE) {
+                        prev_timestamp =
+                                binary_cast<NativeType, 
DateValueType>(timestamp_data[match_row]);
+                    }
+                    if constexpr (WINDOW_FUNNEL_MODE == 
WindowFunnelMode::DEDUPLICATION) {
+                        bool is_dup = false;
+                        if (match_row != last_match_row + 1) {
+                            for (int tmp_column_idx = 1; tmp_column_idx < 
column_idx;
+                                 tmp_column_idx++) {
+                                const auto& tmp_event_column =
+                                        
mutable_block.get_column_by_position(tmp_column_idx);
+                                const auto& tmp_event_data =
+                                        assert_cast<const 
ColumnVector<UInt8>&>(*tmp_event_column)
+                                                .get_data();
+                                auto dup_match_row = 
simd::find_one(tmp_event_data.data(),
+                                                                    
last_match_row + 1, match_row);
+                                if (dup_match_row < match_row) {
+                                    is_dup = true;
+                                    break;
+                                }
+                            }
+                        }
+                        if (is_dup) {
+                            break;
+                        }
+                        last_match_row = match_row;
+                    }
+                    matched_count++;
+                } else {
+                    break;
+                }
+            }
+        }
+        return matched_count;
+    }
+
+    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
+    int _get_internal() const {
+        size_t start_row = 0;
+        int max_found_event_count = 0;
+        const auto& ts_column = 
mutable_block.get_column_by_position(0)->get_ptr();
+        const auto& timestamp_data =
+                assert_cast<const 
ColumnVector<NativeType>&>(*ts_column).get_data().data();
+
+        auto row_count = mutable_block.rows();
+        while (start_row < row_count) {
+            auto found_event_count =
+                    _match_event_list<WINDOW_FUNNEL_MODE>(start_row, 
row_count, timestamp_data);
+            if (found_event_count == event_count) {
+                return found_event_count;
+            }
+            max_found_event_count = std::max(max_found_event_count, 
found_event_count);
+        }
+        return max_found_event_count;
+    }
+    int get() const {
+        auto row_count = mutable_block.rows();
+        if (event_count == 0 || row_count == 0) {
+            return 0;
+        }
+        switch (window_funnel_mode) {
+        case WindowFunnelMode::DEFAULT:
+            return _get_internal<WindowFunnelMode::DEFAULT>();
+        case WindowFunnelMode::DEDUPLICATION:
+            return _get_internal<WindowFunnelMode::DEDUPLICATION>();
+        case WindowFunnelMode::FIXED:
+            return _get_internal<WindowFunnelMode::FIXED>();
+        case WindowFunnelMode::INCREASE:
+            return _get_internal<WindowFunnelMode::INCREASE>();
+        default:
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid 
window_funnel mode");
+            return 0;
+        }
+    }
+
+    void merge(const WindowFunnelState& other) {
+        if (!other.mutable_block.empty()) {
+            auto st = mutable_block.merge(other.mutable_block.to_block());
+            if (!st.ok()) {
+                throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
st.to_string());
+                return;
+            }
+        }
+
+        event_count = event_count > 0 ? event_count : other.event_count;
+        window = window > 0 ? window : other.window;
+        if (enable_mode) {
+            window_funnel_mode = window_funnel_mode == 
WindowFunnelMode::INVALID
+                                         ? other.window_funnel_mode
+                                         : window_funnel_mode;
+        } else {
+            window_funnel_mode = WindowFunnelMode::DEFAULT;
+        }
+    }
+
+    void write(BufferWritable& out) const {
+        write_var_int(event_count, out);
+        write_var_int(window, out);
+        if (enable_mode) {
+            
write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
+                          out);
+        }
+        PBlock pblock;
+        size_t uncompressed_bytes = 0;
+        size_t compressed_bytes = 0;
+        Status status;
+        std::string buff;
+        Block block = mutable_block.to_block();
+        status = block.serialize(
+                BeExecVersionManager::get_newest_version(), &pblock, 
&uncompressed_bytes,
+                &compressed_bytes,
+                segment_v2::CompressionTypePB::ZSTD); // ZSTD for better 
compression ratio
+        if (!status.ok()) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
status.to_string());
+            return;
+        }
+        if (!pblock.SerializeToString(&buff)) {
+            throw doris::Exception(ErrorCode::SERIALIZE_PROTOBUF_ERROR,
+                                   "Serialize window_funnel data");
+            return;
+        }
+        auto data_bytes = buff.size();
+        write_var_uint(data_bytes, out);
+        out.write(buff.data(), data_bytes);
+    }
+
+    void read(BufferReadable& in) {
+        int64_t event_level;
+        read_var_int(event_level, in);
+        event_count = (int)event_level;
+        read_var_int(window, in);
+        window_funnel_mode = WindowFunnelMode::DEFAULT;
+        if (enable_mode) {
+            int64_t mode;
+            read_var_int(mode, in);
+            window_funnel_mode = static_cast<WindowFunnelMode>(mode);
+        }
+        size_t data_bytes = 0;
+        read_var_uint(data_bytes, in);
+        std::string buff;
+        buff.resize(data_bytes);
+        in.read(buff.data(), data_bytes);
+
+        PBlock pblock;
+        if (!pblock.ParseFromArray(buff.data(), data_bytes)) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+                                   "Failed to parse window_funnel data to 
block");
+        }
+        Block block;
+        auto status = block.deserialize(pblock);
+        if (!status.ok()) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
status.to_string());
+        }
+        mutable_block = MutableBlock(std::move(block));
+    }
+};
+
+template <TypeIndex TYPE_INDEX, typename NativeType>
+class AggregateFunctionWindowFunnel
+        : public IAggregateFunctionDataHelper<
+                  WindowFunnelState<TYPE_INDEX, NativeType>,
+                  AggregateFunctionWindowFunnel<TYPE_INDEX, NativeType>> {
+public:
+    AggregateFunctionWindowFunnel(const DataTypes& argument_types_)
+            : IAggregateFunctionDataHelper<WindowFunnelState<TYPE_INDEX, 
NativeType>,
+                                           
AggregateFunctionWindowFunnel<TYPE_INDEX, NativeType>>(
+                      argument_types_) {}
+
+    void create(AggregateDataPtr __restrict place) const override {
+        auto data = new (place) WindowFunnelState<TYPE_INDEX, NativeType>(
+                IAggregateFunction::get_argument_types().size() - 3);
+        /// support window funnel mode from 2.0. See 
`BeExecVersionManager::max_be_exec_version`
+        data->enable_mode = version >= 3;
+    }
+
+    String get_name() const override { return "window_funnel"; }

Review Comment:
   warning: function 'get_name' should be marked [[nodiscard]] 
[modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] String get_name() const override { return "window_funnel"; 
}
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to