This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dc8f64b3e3 [improvement](agg) Serialize the fixed-length aggregation 
results with corresponding columns instead of ColumnString (#11801)
dc8f64b3e3 is described below

commit dc8f64b3e3563c41857e30485dbd4d68ea9da7ab
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Mon Aug 22 10:12:06 2022 +0800

    [improvement](agg) Serialize the fixed-length aggregation results with 
corresponding columns instead of ColumnString (#11801)
---
 be/src/vec/CMakeLists.txt                          |   1 +
 .../vec/aggregate_functions/aggregate_function.h   |  84 +++++++-
 .../aggregate_functions/aggregate_function_avg.h   |  72 ++++++-
 .../aggregate_functions/aggregate_function_count.h |  96 +++++++++
 .../aggregate_function_min_max.h                   | 121 ++++++++++-
 .../aggregate_function_nothing.h                   |   3 +
 .../aggregate_functions/aggregate_function_null.h  |  10 +
 .../aggregate_functions/aggregate_function_sum.h   |  53 +++++
 be/src/vec/columns/column_fixed_length_object.h    | 197 ++++++++++++++++++
 be/src/vec/core/field.h                            |  12 ++
 be/src/vec/core/types.h                            |   3 +
 be/src/vec/data_types/data_type.cpp                |   2 +
 be/src/vec/data_types/data_type_factory.cpp        |   3 +
 be/src/vec/data_types/data_type_factory.hpp        |   1 +
 .../data_types/data_type_fixed_length_object.cpp   |  67 +++++++
 .../vec/data_types/data_type_fixed_length_object.h |  59 ++++++
 be/src/vec/exec/vaggregation_node.cpp              | 223 +++++++++++++--------
 be/src/vec/exec/vaggregation_node.h                |  36 +++-
 be/src/vec/exprs/vectorized_agg_fn.cpp             |  14 ++
 be/src/vec/exprs/vectorized_agg_fn.h               |   6 +
 .../org/apache/doris/planner/AggregationNode.java  |   1 +
 gensrc/proto/types.proto                           |   1 +
 gensrc/thrift/PlanNodes.thrift                     |   3 +-
 23 files changed, 969 insertions(+), 99 deletions(-)

diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index bc37c49310..8743013590 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -66,6 +66,7 @@ set(VEC_FILES
   data_types/data_type_array.cpp
   data_types/data_type_bitmap.cpp
   data_types/data_type_factory.cpp
+  data_types/data_type_fixed_length_object.cpp
   data_types/data_type_hll.cpp
   data_types/data_type_nothing.cpp
   data_types/data_type_nothing.cpp
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h 
b/be/src/vec/aggregate_functions/aggregate_function.h
index ae9a3667e5..6309b0c994 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -28,6 +28,7 @@
 #include "vec/core/column_numbers.h"
 #include "vec/core/field.h"
 #include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
 
 namespace doris::vectorized {
 
@@ -76,6 +77,9 @@ public:
     /// Delete data for aggregation.
     virtual void destroy(AggregateDataPtr __restrict place) const noexcept = 0;
 
+    virtual void destroy_vec(AggregateDataPtr __restrict place,
+                             const size_t num_rows) const noexcept = 0;
+
     /// Reset aggregation state
     virtual void reset(AggregateDataPtr place) const = 0;
 
@@ -117,17 +121,29 @@ public:
     virtual void serialize_vec(const std::vector<AggregateDataPtr>& places, 
size_t offset,
                                BufferWritable& buf, const size_t num_rows) 
const = 0;
 
+    virtual void serialize_to_column(const std::vector<AggregateDataPtr>& 
places, size_t offset,
+                                     MutableColumnPtr& dst, const size_t 
num_rows) const = 0;
+
+    virtual void serialize_without_key_to_column(ConstAggregateDataPtr 
__restrict place,
+                                                 MutableColumnPtr& dst) const 
= 0;
+
     /// Deserializes state. This function is called only for empty (just 
created) states.
     virtual void deserialize(AggregateDataPtr __restrict place, 
BufferReadable& buf,
                              Arena* arena) const = 0;
 
-    virtual void deserialize_vec(AggregateDataPtr places, ColumnString* 
column, Arena* arena,
+    virtual void deserialize_vec(AggregateDataPtr places, const ColumnString* 
column, Arena* arena,
                                  size_t num_rows) const = 0;
 
+    virtual void deserialize_from_column(AggregateDataPtr places, const 
IColumn& column,
+                                         Arena* arena, size_t num_rows) const 
= 0;
+
     /// Deserializes state and merge it with current aggregation function.
     virtual void deserialize_and_merge(AggregateDataPtr __restrict place, 
BufferReadable& buf,
                                        Arena* arena) const = 0;
 
+    virtual void deserialize_and_merge_from_column(AggregateDataPtr __restrict 
place,
+                                                   const IColumn& column, 
Arena* arena) const = 0;
+
     /// Returns true if a function requires Arena to handle own states (see 
add(), merge(), deserialize()).
     virtual bool allocates_memory_in_arena() const { return false; }
 
@@ -169,9 +185,19 @@ public:
                                         AggregateDataPtr place, const 
IColumn** columns,
                                         Arena* arena) const = 0;
 
+    virtual void streaming_agg_serialize(const IColumn** columns, 
BufferWritable& buf,
+                                         const size_t num_rows, Arena* arena) 
const = 0;
+
+    virtual void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
+                                                   const size_t num_rows, 
Arena* arena) const = 0;
+
     const DataTypes& get_argument_types() const { return argument_types; }
     const Array& get_parameters() const { return parameters; }
 
+    virtual MutableColumnPtr create_serialize_column() const { return 
ColumnString::create(); }
+
+    virtual DataTypePtr get_serialized_type() const { return 
std::make_shared<DataTypeString>(); }
+
 protected:
     DataTypes argument_types;
     Array parameters;
@@ -184,6 +210,14 @@ public:
     IAggregateFunctionHelper(const DataTypes& argument_types_, const Array& 
parameters_)
             : IAggregateFunction(argument_types_, parameters_) {}
 
+    void destroy_vec(AggregateDataPtr __restrict place,
+                     const size_t num_rows) const noexcept override {
+        const size_t size_of_data_ = size_of_data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            static_cast<const Derived*>(this)->destroy(place + size_of_data_ * 
i);
+        }
+    }
+
     void add_batch(size_t batch_size, AggregateDataPtr* places, size_t 
place_offset,
                    const IColumn** columns, Arena* arena, bool agg_many) const 
override {
         if constexpr (std::is_same_v<Derived, 
AggregateFunctionBitmapCount<false, ColumnBitmap>> ||
@@ -266,7 +300,38 @@ public:
         }
     }
 
-    void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena* 
arena,
+    void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
+                             MutableColumnPtr& dst, const size_t num_rows) 
const override {
+        VectorBufferWriter writter(assert_cast<ColumnString&>(*dst));
+        serialize_vec(places, offset, writter, num_rows);
+    }
+
+    void streaming_agg_serialize(const IColumn** columns, BufferWritable& buf,
+                                 const size_t num_rows, Arena* arena) const 
override {
+        char place[size_of_data()];
+        for (size_t i = 0; i != num_rows; ++i) {
+            static_cast<const Derived*>(this)->create(place);
+            static_cast<const Derived*>(this)->add(place, columns, i, arena);
+            static_cast<const Derived*>(this)->serialize(place, buf);
+            buf.commit();
+            static_cast<const Derived*>(this)->destroy(place);
+        }
+    }
+
+    void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
+                                           const size_t num_rows, Arena* 
arena) const override {
+        VectorBufferWriter writter(static_cast<ColumnString&>(*dst));
+        streaming_agg_serialize(columns, writter, num_rows, arena);
+    }
+
+    void serialize_without_key_to_column(ConstAggregateDataPtr __restrict 
place,
+                                         MutableColumnPtr& dst) const override 
{
+        VectorBufferWriter writter(static_cast<ColumnString&>(*dst));
+        static_cast<const Derived*>(this)->serialize(place, writter);
+        writter.commit();
+    }
+
+    void deserialize_vec(AggregateDataPtr places, const ColumnString* column, 
Arena* arena,
                          size_t num_rows) const override {
         const auto size_of_data = static_cast<const 
Derived*>(this)->size_of_data();
         for (size_t i = 0; i != num_rows; ++i) {
@@ -277,6 +342,11 @@ public:
         }
     }
 
+    void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena* arena,
+                                 size_t num_rows) const override {
+        deserialize_vec(places, assert_cast<const ColumnString*>(&column), 
arena, num_rows);
+    }
+
     void merge_vec(const AggregateDataPtr* places, size_t offset, 
ConstAggregateDataPtr rhs,
                    Arena* arena, const size_t num_rows) const override {
         const auto size_of_data = static_cast<const 
Derived*>(this)->size_of_data();
@@ -338,6 +408,16 @@ public:
         derived->merge(place, deserialized_place, arena);
         derived->destroy(deserialized_place);
     }
+
+    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
+                                           Arena* arena) const override {
+        size_t num_rows = column.size();
+        for (size_t i = 0; i != num_rows; ++i) {
+            VectorBufferReader buffer_reader(
+                    (assert_cast<const ColumnString&>(column)).get_data_at(i));
+            deserialize_and_merge(place, buffer_reader, arena);
+        }
+    }
 };
 
 using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h 
b/be/src/vec/aggregate_functions/aggregate_function_avg.h
index eab938e4c1..c80c46b8f9 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_avg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h
@@ -22,8 +22,9 @@
 
 #include "common/status.h"
 #include "vec/aggregate_functions/aggregate_function.h"
-#include "vec/columns/columns_number.h"
+#include "vec/columns/column_fixed_length_object.h"
 #include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_fixed_length_object.h"
 #include "vec/data_types/data_type_number.h"
 #include "vec/io/io_helper.h"
 
@@ -34,6 +35,12 @@ struct AggregateFunctionAvgData {
     T sum = 0;
     UInt64 count = 0;
 
+    AggregateFunctionAvgData& operator=(const AggregateFunctionAvgData<T>& 
src) {
+        sum = src.sum;
+        count = src.count;
+        return *this;
+    }
+
     template <typename ResultT>
     ResultT result() const {
         if constexpr (std::is_floating_point_v<ResultT>) {
@@ -136,6 +143,69 @@ public:
         column.get_data().push_back(this->data(place).template 
result<ResultType>());
     }
 
+    void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena* arena,
+                                 size_t num_rows) const override {
+        auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
+        DCHECK(col.size() >= num_rows) << "source column's size should greater 
than num_rows";
+        auto* data = col.get_data().data();
+        memcpy(places, data, sizeof(Data) * num_rows);
+    }
+
+    void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
+                             MutableColumnPtr& dst, const size_t num_rows) 
const override {
+        auto& col = assert_cast<ColumnFixedLengthObject&>(*dst);
+        col.set_item_size(sizeof(Data));
+        col.resize(num_rows);
+        auto* data = col.get_data().data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            *reinterpret_cast<Data*>(&data[sizeof(Data) * i]) =
+                    *reinterpret_cast<Data*>(places[i] + offset);
+        }
+    }
+
+    void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
+                                           const size_t num_rows, Arena* 
arena) const override {
+        auto* src_data = assert_cast<const 
ColVecType&>(*columns[0]).get_data().data();
+        auto& dst_col = static_cast<ColumnFixedLengthObject&>(*dst);
+        dst_col.set_item_size(sizeof(Data));
+        dst_col.resize(num_rows);
+        auto* data = dst_col.get_data().data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            auto& state = *reinterpret_cast<Data*>(&data[sizeof(Data) * i]);
+            state.sum = src_data[i];
+            state.count = 1;
+        }
+    }
+
+    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
+                                           Arena* arena) const override {
+        auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
+        const size_t num_rows = column.size();
+        DCHECK(col.size() >= num_rows) << "source column's size should greater 
than num_rows";
+        auto* data = reinterpret_cast<const Data*>(col.get_data().data());
+
+        for (size_t i = 0; i != num_rows; ++i) {
+            this->data(place).sum += data[i].sum;
+            this->data(place).count += data[i].count;
+        }
+    }
+
+    void serialize_without_key_to_column(ConstAggregateDataPtr __restrict 
place,
+                                         MutableColumnPtr& dst) const override 
{
+        auto& col = assert_cast<ColumnFixedLengthObject&>(*dst);
+        col.set_item_size(sizeof(Data));
+        col.resize(1);
+        *reinterpret_cast<Data*>(col.get_data().data()) = this->data(place);
+    }
+
+    MutableColumnPtr create_serialize_column() const override {
+        return ColumnFixedLengthObject::create(sizeof(Data));
+    }
+
+    DataTypePtr get_serialized_type() const override {
+        return std::make_shared<DataTypeFixedLengthObject>();
+    }
+
 private:
     UInt32 scale;
 };
diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h 
b/be/src/vec/aggregate_functions/aggregate_function_count.h
index e2ba3768e7..8ab6b53e56 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_count.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_count.h
@@ -71,6 +71,54 @@ public:
     void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
         assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).count);
     }
+
+    void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena* arena,
+                                 size_t num_rows) const override {
+        auto data = assert_cast<const ColumnUInt64&>(column).get_data().data();
+        auto* dst_data = reinterpret_cast<Data*>(places);
+        for (size_t i = 0; i != num_rows; ++i) {
+            dst_data[i].count = data[i];
+        }
+    }
+
+    void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
+                             MutableColumnPtr& dst, const size_t num_rows) 
const override {
+        auto& col = assert_cast<ColumnUInt64&>(*dst);
+        col.resize(num_rows);
+        auto* data = col.get_data().data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            data[i] = this->data(places[i] + offset).count;
+        }
+    }
+
+    void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
+                                           const size_t num_rows, Arena* 
arena) const override {
+        auto& col = assert_cast<ColumnUInt64&>(*dst);
+        col.resize(num_rows);
+        col.get_data().assign(num_rows, 1UL);
+    }
+
+    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
+                                           Arena* arena) const override {
+        auto data = assert_cast<const ColumnUInt64&>(column).get_data().data();
+        const size_t num_rows = column.size();
+        for (size_t i = 0; i != num_rows; ++i) {
+            this->data(place).count += data[i];
+        }
+    }
+
+    void serialize_without_key_to_column(ConstAggregateDataPtr __restrict 
place,
+                                         MutableColumnPtr& dst) const override 
{
+        auto& col = assert_cast<ColumnUInt64&>(*dst);
+        col.resize(1);
+        reinterpret_cast<Data*>(col.get_data().data())->count = 
this->data(place).count;
+    }
+
+    MutableColumnPtr create_serialize_column() const override {
+        return ColumnVector<UInt64>::create();
+    }
+
+    DataTypePtr get_serialized_type() const override { return 
std::make_shared<DataTypeUInt64>(); }
 };
 
 /// Simply count number of not-NULL values.
@@ -117,6 +165,54 @@ public:
             
assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).count);
         }
     }
+
+    void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena* arena,
+                                 size_t num_rows) const override {
+        auto data = assert_cast<const ColumnUInt64&>(column).get_data().data();
+        auto* dst_data = reinterpret_cast<Data*>(places);
+        for (size_t i = 0; i != num_rows; ++i) {
+            dst_data[i].count = data[i];
+        }
+    }
+
+    void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
+                             MutableColumnPtr& dst, const size_t num_rows) 
const override {
+        auto& col = assert_cast<ColumnUInt64&>(*dst);
+        col.resize(num_rows);
+        auto* data = col.get_data().data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            data[i] = this->data(places[i] + offset).count;
+        }
+    }
+
+    void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
+                                           const size_t num_rows, Arena* 
arena) const override {
+        auto& col = assert_cast<ColumnUInt64&>(*dst);
+        col.resize(num_rows);
+        col.get_data().assign(num_rows, 1UL);
+    }
+
+    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
+                                           Arena* arena) const override {
+        auto data = assert_cast<const ColumnUInt64&>(column).get_data().data();
+        const size_t num_rows = column.size();
+        for (size_t i = 0; i != num_rows; ++i) {
+            this->data(place).count += data[i];
+        }
+    }
+
+    void serialize_without_key_to_column(ConstAggregateDataPtr __restrict 
place,
+                                         MutableColumnPtr& dst) const override 
{
+        auto& col = assert_cast<ColumnUInt64&>(*dst);
+        col.resize(1);
+        reinterpret_cast<Data*>(col.get_data().data())->count = 
this->data(place).count;
+    }
+
+    MutableColumnPtr create_serialize_column() const override {
+        return ColumnVector<UInt64>::create();
+    }
+
+    DataTypePtr get_serialized_type() const override { return 
std::make_shared<DataTypeUInt64>(); }
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h 
b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
index ee91c0bbb9..8fb36cddee 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
@@ -23,8 +23,10 @@
 #include "common/logging.h"
 #include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/columns/column_decimal.h"
+#include "vec/columns/column_fixed_length_object.h"
 #include "vec/columns/column_vector.h"
 #include "vec/common/assert_cast.h"
+#include "vec/data_types/data_type_fixed_length_object.h"
 #include "vec/io/io_helper.h"
 
 namespace doris::vectorized {
@@ -40,8 +42,20 @@ private:
     T value;
 
 public:
+    SingleValueDataFixed() = default;
+    SingleValueDataFixed(bool has_value_, T value_) : has_value(has_value_), 
value(value_) {}
     bool has() const { return has_value; }
 
+    constexpr static bool IsFixedLength = true;
+    using value_type = T;
+
+    value_type get_value() const { return value; }
+
+    void set_value(T value_) {
+        has_value = true;
+        value = value_;
+    }
+
     void insert_result_into(IColumn& to) const {
         if (has()) {
             assert_cast<ColumnVector<T>&>(to).get_data().push_back(value);
@@ -136,8 +150,20 @@ private:
     Type value;
 
 public:
+    SingleValueDataDecimal() = default;
+    SingleValueDataDecimal(bool has_value_, T value_) : has_value(has_value_), 
value(value_) {}
     bool has() const { return has_value; }
 
+    constexpr static bool IsFixedLength = true;
+    using value_type = Type;
+
+    value_type get_value() const { return value; }
+
+    void set_value(T value_) {
+        has_value = true;
+        value = value_;
+    }
+
     void insert_result_into(IColumn& to) const {
         if (has()) {
             assert_cast<ColumnDecimal<T>&>(to).insert_data((const 
char*)&value, 0);
@@ -242,6 +268,10 @@ private:
 public:
     ~SingleValueDataString() { delete[] large_data; }
 
+    constexpr static bool IsFixedLength = false;
+
+    using value_type = String;
+
     bool has() const { return size >= 0; }
 
     const char* get_data() const { return size <= MAX_SMALL_STRING_SIZE ? 
small_data : large_data; }
@@ -377,8 +407,9 @@ public:
 };
 
 template <typename Data>
-struct AggregateFunctionMaxData : Data {
+struct AggregateFunctionMaxData : public Data {
     using Self = AggregateFunctionMaxData;
+    using Data::IsFixedLength;
 
     bool change_if_better(const IColumn& column, size_t row_num, Arena* arena) 
{
         return this->change_if_greater(column, row_num, arena);
@@ -393,6 +424,7 @@ struct AggregateFunctionMaxData : Data {
 template <typename Data>
 struct AggregateFunctionMinData : Data {
     using Self = AggregateFunctionMinData;
+    using Data::IsFixedLength;
 
     bool change_if_better(const IColumn& column, size_t row_num, Arena* arena) 
{
         return this->change_if_less(column, row_num, arena);
@@ -408,6 +440,8 @@ class AggregateFunctionsSingleValue final
                   Data, AggregateFunctionsSingleValue<Data, 
AllocatesMemoryInArena>> {
 private:
     DataTypePtr& type;
+    using Base = IAggregateFunctionDataHelper<
+            Data, AggregateFunctionsSingleValue<Data, AllocatesMemoryInArena>>;
 
 public:
     AggregateFunctionsSingleValue(const DataTypePtr& type_)
@@ -456,6 +490,91 @@ public:
     void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
         this->data(place).insert_result_into(to);
     }
+
+    void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena* arena,
+                                 size_t num_rows) const override {
+        if constexpr (Data::IsFixedLength) {
+            const auto& col = static_cast<const 
ColumnFixedLengthObject&>(column);
+            auto* column_data = reinterpret_cast<const 
Data*>(col.get_data().data());
+            Data* data = reinterpret_cast<Data*>(places);
+            for (size_t i = 0; i != num_rows; ++i) {
+                data[i] = column_data[i];
+            }
+        } else {
+            Base::deserialize_from_column(places, column, arena, num_rows);
+        }
+    }
+
+    void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
+                             MutableColumnPtr& dst, const size_t num_rows) 
const override {
+        if constexpr (Data::IsFixedLength) {
+            auto& dst_column = static_cast<ColumnFixedLengthObject&>(*dst);
+            dst_column.resize(num_rows);
+            auto* dst_data = 
reinterpret_cast<Data*>(dst_column.get_data().data());
+            for (size_t i = 0; i != num_rows; ++i) {
+                dst_data[i] = this->data(places[i] + offset);
+            }
+        } else {
+            Base::serialize_to_column(places, offset, dst, num_rows);
+        }
+    }
+
+    void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
+                                           const size_t num_rows, Arena* 
arena) const override {
+        if constexpr (Data::IsFixedLength) {
+            const auto& src_column = static_cast<const 
ColumnFixedLengthObject&>(*columns[0]);
+            auto* src_data = reinterpret_cast<const 
Data*>(src_column.get_data().data());
+            auto& dst_column = static_cast<ColumnFixedLengthObject&>(*dst);
+            dst_column.resize(num_rows);
+            auto* dst_data = 
reinterpret_cast<Data*>(dst_column.get_data().data());
+            for (size_t i = 0; i != num_rows; ++i) {
+                dst_data[i] = src_data[i];
+            }
+        } else {
+            Base::streaming_agg_serialize_to_column(columns, dst, num_rows, 
arena);
+        }
+    }
+
+    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
+                                           Arena* arena) const override {
+        if constexpr (Data::IsFixedLength) {
+            const auto& col = static_cast<const 
ColumnFixedLengthObject&>(column);
+            auto* column_data = reinterpret_cast<const 
Data*>(col.get_data().data());
+            const size_t num_rows = column.size();
+            for (size_t i = 0; i != num_rows; ++i) {
+                this->data(place).change_if_better(column_data[i], arena);
+            }
+        } else {
+            Base::deserialize_and_merge_from_column(place, column, arena);
+        }
+    }
+
+    void serialize_without_key_to_column(ConstAggregateDataPtr __restrict 
place,
+                                         MutableColumnPtr& dst) const override 
{
+        if constexpr (Data::IsFixedLength) {
+            auto& col = assert_cast<ColumnFixedLengthObject&>(*dst);
+            col.resize(1);
+            *reinterpret_cast<Data*>(col.get_data().data()) = 
this->data(place);
+        } else {
+            Base::serialize_without_key_to_column(place, dst);
+        }
+    }
+
+    MutableColumnPtr create_serialize_column() const override {
+        if constexpr (Data::IsFixedLength) {
+            return ColumnFixedLengthObject::create(sizeof(Data));
+        } else {
+            return ColumnString::create();
+        }
+    }
+
+    DataTypePtr get_serialized_type() const override {
+        if constexpr (Data::IsFixedLength) {
+            return std::make_shared<DataTypeFixedLengthObject>();
+        } else {
+            return std::make_shared<DataTypeString>();
+        }
+    }
 };
 
 AggregateFunctionPtr create_aggregate_function_max(const std::string& name,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_nothing.h 
b/be/src/vec/aggregate_functions/aggregate_function_nothing.h
index 64af14a6cf..c6b310a9a5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_nothing.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_nothing.h
@@ -67,6 +67,9 @@ public:
 
     void deserialize_and_merge(AggregateDataPtr __restrict place, 
BufferReadable& buf,
                                Arena* arena) const override {}
+
+    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
+                                           Arena* arena) const override {}
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h 
b/be/src/vec/aggregate_functions/aggregate_function_null.h
index 89960bc9f0..7b8489c095 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_null.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_null.h
@@ -163,6 +163,16 @@ public:
         }
     }
 
+    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
+                                           Arena* arena) const override {
+        size_t num_rows = column.size();
+        for (size_t i = 0; i != num_rows; ++i) {
+            VectorBufferReader buffer_reader(
+                    (assert_cast<const ColumnString&>(column)).get_data_at(i));
+            deserialize_and_merge(place, buffer_reader, arena);
+        }
+    }
+
     void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
         if constexpr (result_is_nullable) {
             ColumnNullable& to_concrete = assert_cast<ColumnNullable&>(to);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h 
b/be/src/vec/aggregate_functions/aggregate_function_sum.h
index 9c5bdfc3bf..46ca85f3a9 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h
@@ -101,6 +101,59 @@ public:
         column.get_data().push_back(this->data(place).get());
     }
 
+    void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena* arena,
+                                 size_t num_rows) const override {
+        auto data = assert_cast<const ColVecResult&>(column).get_data().data();
+        auto dst_data = reinterpret_cast<Data*>(places);
+        for (size_t i = 0; i != num_rows; ++i) {
+            dst_data[i].sum = data[i];
+        }
+    }
+
+    void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
+                             MutableColumnPtr& dst, const size_t num_rows) 
const override {
+        auto& col = assert_cast<ColVecResult&>(*dst);
+        col.resize(num_rows);
+        auto* data = col.get_data().data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            data[i] = this->data(places[i] + offset).sum;
+        }
+    }
+
+    void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
+                                           const size_t num_rows, Arena* 
arena) const override {
+        auto& col = assert_cast<ColVecResult&>(*dst);
+        auto& src = assert_cast<const ColVecType&>(*columns[0]);
+        col.resize(num_rows);
+        auto* src_data = src.get_data().data();
+        auto* dst_data = col.get_data().data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            dst_data[i] = src_data[i];
+        }
+    }
+
+    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
+                                           Arena* arena) const override {
+        auto data = assert_cast<const ColVecResult&>(column).get_data().data();
+        const size_t num_rows = column.size();
+        for (size_t i = 0; i != num_rows; ++i) {
+            this->data(place).sum += data[i];
+        }
+    }
+
+    void serialize_without_key_to_column(ConstAggregateDataPtr __restrict 
place,
+                                         MutableColumnPtr& dst) const override 
{
+        auto& col = assert_cast<ColVecResult&>(*dst);
+        col.resize(1);
+        reinterpret_cast<Data*>(col.get_data().data())->sum = 
this->data(place).sum;
+    }
+
+    MutableColumnPtr create_serialize_column() const override {
+        return get_return_type()->create_column();
+    }
+
+    DataTypePtr get_serialized_type() const override { return 
get_return_type(); }
+
 private:
     UInt32 scale;
 };
diff --git a/be/src/vec/columns/column_fixed_length_object.h 
b/be/src/vec/columns/column_fixed_length_object.h
new file mode 100644
index 0000000000..5ecb13ec61
--- /dev/null
+++ b/be/src/vec/columns/column_fixed_length_object.h
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/columns/column.h"
+#include "vec/columns/columns_common.h"
+#include "vec/common/arena.h"
+#include "vec/common/pod_array.h"
+
+namespace doris::vectorized {
+
+class ColumnFixedLengthObject final : public COWHelper<IColumn, 
ColumnFixedLengthObject> {
+private:
+    using Self = ColumnFixedLengthObject;
+    friend class COWHelper<IColumn, ColumnFixedLengthObject>;
+    friend class OlapBlockDataConvertor;
+
+public:
+    using Container = PaddedPODArray<uint8_t>;
+
+private:
+    ColumnFixedLengthObject() = delete;
+    ColumnFixedLengthObject(const size_t _item_size_) : 
_item_size(_item_size_), _item_count(0) {}
+    ColumnFixedLengthObject(const ColumnFixedLengthObject& src)
+            : _item_size(src._item_size),
+              _item_count(src._item_count),
+              _data(src._data.begin(), src._data.end()) {}
+
+public:
+    const char* get_family_name() const override { return 
"ColumnFixedLengthObject"; }
+
+    size_t size() const override { return _item_count; }
+
+    const Container& get_data() const { return _data; }
+
+    Container& get_data() { return _data; }
+
+    void resize(size_t n) override {
+        DCHECK(_item_size > 0) << "_item_size should be greater than 0";
+        _data.resize(n * _item_size);
+        _item_count = n;
+    }
+
+    MutableColumnPtr clone_resized(size_t size) const override {
+        auto res = this->create(_item_size);
+
+        if (size > 0) {
+            auto& new_col = static_cast<Self&>(*res);
+            new_col.resize(size);
+            auto* new_data = new_col._data.data();
+
+            size_t count = std::min(this->size(), size);
+            memcpy(new_data, _data.data(), count * _item_size);
+
+            if (size > count) memset(new_data + count * _item_size, 0, (size - 
count) * _item_size);
+        }
+
+        return res;
+    }
+
+    void insert_indices_from(const IColumn& src, const int* indices_begin,
+                             const int* indices_end) override {
+        const Self& src_vec = static_cast<const Self&>(src);
+        auto origin_size = size();
+        auto new_size = indices_end - indices_begin;
+        if (_item_size == 0) {
+            _item_size = src_vec._item_size;
+        }
+        DCHECK(_item_size == src_vec._item_size) << "dst and src should have 
the same _item_size";
+        resize(origin_size + new_size);
+
+        for (int i = 0; i < new_size; ++i) {
+            int offset = indices_begin[i];
+            if (offset > -1) {
+                memcpy(&_data[(origin_size + i) * _item_size], 
&src_vec._data[offset * _item_size],
+                       _item_size);
+            } else {
+                memset(&_data[(origin_size + i) * _item_size], 0, _item_size);
+            }
+        }
+    }
+
+    void clear() override {
+        _data.clear();
+        _item_count = 0;
+    }
+
+    [[noreturn]] Field operator[](size_t n) const override {
+        LOG(FATAL) << "operator[] not supported";
+    }
+
+    void get(size_t n, Field& res) const override { LOG(FATAL) << "get not 
supported"; }
+
+    [[noreturn]] StringRef get_data_at(size_t n) const override {
+        LOG(FATAL) << "get_data_at not supported";
+    }
+
+    void insert(const Field& x) override { LOG(FATAL) << "insert not 
supported"; }
+
+    void insert_range_from(const IColumn& src, size_t start, size_t length) 
override {
+        LOG(FATAL) << "insert_range_from not supported";
+    }
+
+    void insert_data(const char* pos, size_t length) override {
+        LOG(FATAL) << "insert_data not supported";
+    }
+
+    void insert_default() override { LOG(FATAL) << "insert_default not 
supported"; }
+
+    void pop_back(size_t n) override { LOG(FATAL) << "pop_back not supported"; 
}
+
+    StringRef serialize_value_into_arena(size_t n, Arena& arena,
+                                         char const*& begin) const override {
+        LOG(FATAL) << "serialize_value_into_arena not supported";
+    }
+
+    const char* deserialize_and_insert_from_arena(const char* pos) override {
+        LOG(FATAL) << "deserialize_and_insert_from_arena not supported";
+    }
+
+    void update_hash_with_value(size_t n, SipHash& hash) const override {
+        LOG(FATAL) << "update_hash_with_value not supported";
+    }
+
+    [[noreturn]] ColumnPtr filter(const IColumn::Filter& filt,
+                                  ssize_t result_size_hint) const override {
+        LOG(FATAL) << "filter not supported";
+    }
+
+    [[noreturn]] ColumnPtr permute(const IColumn::Permutation& perm, size_t 
limit) const override {
+        LOG(FATAL) << "permute not supported";
+    }
+
+    [[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs,
+                                int nan_direction_hint) const override {
+        LOG(FATAL) << "compare_at not supported";
+    }
+
+    void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
+                         IColumn::Permutation& res) const override {
+        LOG(FATAL) << "get_permutation not supported";
+    }
+
+    [[noreturn]] ColumnPtr replicate(const IColumn::Offsets& offsets) const 
override {
+        LOG(FATAL) << "replicate not supported";
+    }
+
+    [[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns,
+                                        const IColumn::Selector& selector) 
const override {
+        LOG(FATAL) << "scatter not supported";
+    }
+
+    void get_extremes(Field& min, Field& max) const override {
+        LOG(FATAL) << "get_extremes not supported";
+    }
+
+    size_t byte_size() const override { return _data.size(); }
+
+    size_t item_size() const { return _item_size; }
+
+    void set_item_size(size_t size) {
+        DCHECK(_item_count == 0 || size == _item_size)
+                << "cannot reset _item_size of ColumnFixedLengthObject";
+        _item_size = size;
+    }
+
+    size_t allocated_bytes() const override { return _data.allocated_bytes(); }
+
+    void replace_column_data(const IColumn&, size_t row, size_t self_row = 0) 
override {
+        LOG(FATAL) << "replace_column_data not supported";
+    }
+
+    void replace_column_data_default(size_t self_row = 0) override {
+        LOG(FATAL) << "replace_column_data_default not supported";
+    }
+
+protected:
+    size_t _item_size;
+    size_t _item_count;
+    Container _data;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/core/field.h b/be/src/vec/core/field.h
index cddd7037d7..4c4a275e34 100644
--- a/be/src/vec/core/field.h
+++ b/be/src/vec/core/field.h
@@ -182,6 +182,7 @@ public:
             Float64 = 3,
             UInt128 = 4,
             Int128 = 5,
+            FixedLengthObject = 6,
 
             /// Non-POD types.
 
@@ -224,6 +225,8 @@ public:
                 return "Decimal128";
             case AggregateFunctionState:
                 return "AggregateFunctionState";
+            case FixedLengthObject:
+                return "FixedLengthObject";
             }
 
             LOG(FATAL) << "Bad type of Field";
@@ -378,6 +381,8 @@ public:
             return get<DecimalField<Decimal128>>() < 
rhs.get<DecimalField<Decimal128>>();
         case Types::AggregateFunctionState:
             return get<AggregateFunctionStateData>() < 
rhs.get<AggregateFunctionStateData>();
+        case Types::FixedLengthObject:
+            break;
         }
 
         LOG(FATAL) << "Bad type of Field";
@@ -417,6 +422,8 @@ public:
             return get<DecimalField<Decimal128>>() <= 
rhs.get<DecimalField<Decimal128>>();
         case Types::AggregateFunctionState:
             return get<AggregateFunctionStateData>() <= 
rhs.get<AggregateFunctionStateData>();
+        case Types::FixedLengthObject:
+            break;
         }
         LOG(FATAL) << "Bad type of Field";
         return {};
@@ -452,6 +459,8 @@ public:
             return get<DecimalField<Decimal128>>() == 
rhs.get<DecimalField<Decimal128>>();
         case Types::AggregateFunctionState:
             return get<AggregateFunctionStateData>() == 
rhs.get<AggregateFunctionStateData>();
+        case Types::FixedLengthObject:
+            break;
         }
 
         CHECK(false) << "Bad type of Field";
@@ -544,6 +553,9 @@ private:
         case Types::AggregateFunctionState:
             f(field.template get<AggregateFunctionStateData>());
             return;
+        case Types::FixedLengthObject:
+            LOG(FATAL) << "FixedLengthObject not supported";
+            break;
         }
     }
 
diff --git a/be/src/vec/core/types.h b/be/src/vec/core/types.h
index def50d5e01..f57f41d58a 100644
--- a/be/src/vec/core/types.h
+++ b/be/src/vec/core/types.h
@@ -77,6 +77,7 @@ enum class TypeIndex {
     DateV2,
     DateTimeV2,
     TimeV2,
+    FixedLengthObject,
 };
 
 struct Consted {
@@ -448,6 +449,8 @@ inline const char* getTypeName(TypeIndex idx) {
         return TypeName<BitmapValue>::get();
     case TypeIndex::HLL:
         return TypeName<HyperLogLog>::get();
+    case TypeIndex::FixedLengthObject:
+        return "FixedLengthObject";
     }
 
     __builtin_unreachable();
diff --git a/be/src/vec/data_types/data_type.cpp 
b/be/src/vec/data_types/data_type.cpp
index f04bd0c592..f5a771131a 100644
--- a/be/src/vec/data_types/data_type.cpp
+++ b/be/src/vec/data_types/data_type.cpp
@@ -143,6 +143,8 @@ PGenericType_TypeId IDataType::get_pdata_type(const 
IDataType* data_type) {
         return PGenericType::HLL;
     case TypeIndex::Array:
         return PGenericType::LIST;
+    case TypeIndex::FixedLengthObject:
+        return PGenericType::FIXEDLENGTHOBJECT;
     default:
         return PGenericType::UNKNOWN;
     }
diff --git a/be/src/vec/data_types/data_type_factory.cpp 
b/be/src/vec/data_types/data_type_factory.cpp
index e2d1e9d9f3..980bd547d7 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -280,6 +280,9 @@ DataTypePtr DataTypeFactory::create_data_type(const 
PColumnMeta& pcolumn) {
         DCHECK(pcolumn.children_size() == 1);
         nested = 
std::make_shared<DataTypeArray>(create_data_type(pcolumn.children(0)));
         break;
+    case PGenericType::FIXEDLENGTHOBJECT:
+        nested = std::make_shared<DataTypeFixedLengthObject>();
+        break;
     default: {
         LOG(FATAL) << fmt::format("Unknown data type: {}", pcolumn.type());
         return nullptr;
diff --git a/be/src/vec/data_types/data_type_factory.hpp 
b/be/src/vec/data_types/data_type_factory.hpp
index 37dda1ee54..73e0a147fe 100644
--- a/be/src/vec/data_types/data_type_factory.hpp
+++ b/be/src/vec/data_types/data_type_factory.hpp
@@ -34,6 +34,7 @@
 #include "vec/data_types/data_type_date.h"
 #include "vec/data_types/data_type_date_time.h"
 #include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_fixed_length_object.h"
 #include "vec/data_types/data_type_nothing.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_number.h"
diff --git a/be/src/vec/data_types/data_type_fixed_length_object.cpp 
b/be/src/vec/data_types/data_type_fixed_length_object.cpp
new file mode 100644
index 0000000000..d96945b2cd
--- /dev/null
+++ b/be/src/vec/data_types/data_type_fixed_length_object.cpp
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/data_types/data_type_fixed_length_object.h"
+
+#include "vec/aggregate_functions/aggregate_function_avg.h"
+
+namespace doris::vectorized {
+
+char* DataTypeFixedLengthObject::serialize(const IColumn& column, char* buf) 
const {
+    // row num
+    const auto row_num = column.size();
+    *reinterpret_cast<uint32_t*>(buf) = row_num;
+    buf += sizeof(uint32_t);
+    // column data
+    auto ptr = column.convert_to_full_column_if_const();
+    const auto& src_col = assert_cast<const ColumnType&>(*ptr.get());
+    DCHECK(src_col.item_size() > 0)
+            << "[serialize]item size of DataTypeFixedLengthObject should be 
greater than 0";
+    *reinterpret_cast<size_t*>(buf) = src_col.item_size();
+    buf += sizeof(size_t);
+    const auto* origin_data = src_col.get_data().data();
+    memcpy(buf, origin_data, row_num * src_col.item_size());
+    buf += row_num * src_col.item_size();
+
+    return buf;
+}
+
+const char* DataTypeFixedLengthObject::deserialize(const char* buf, IColumn* 
column) const {
+    // row num
+    uint32_t row_num = *reinterpret_cast<const uint32_t*>(buf);
+    buf += sizeof(uint32_t);
+    size_t item_size = *reinterpret_cast<const size_t*>(buf);
+    buf += sizeof(size_t);
+
+    DCHECK(item_size > 0)
+            << "[deserialize]item size of DataTypeFixedLengthObject should be 
greater than 0";
+
+    auto& dst_col = static_cast<ColumnType&>(*column);
+    dst_col.set_item_size(item_size);
+    // column data
+    dst_col.resize(row_num);
+    memcpy(dst_col.get_data().data(), buf, row_num * item_size);
+    buf += row_num * item_size;
+
+    return buf;
+}
+
+MutableColumnPtr DataTypeFixedLengthObject::create_column() const {
+    return ColumnType::create(0);
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/data_types/data_type_fixed_length_object.h 
b/be/src/vec/data_types/data_type_fixed_length_object.h
new file mode 100644
index 0000000000..5d346b297d
--- /dev/null
+++ b/be/src/vec/data_types/data_type_fixed_length_object.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/columns/column_fixed_length_object.h"
+#include "vec/common/typeid_cast.h"
+#include "vec/data_types/data_type.h"
+
+namespace doris::vectorized {
+
+class DataTypeFixedLengthObject final : public IDataType {
+public:
+    using ColumnType = ColumnFixedLengthObject;
+
+    DataTypeFixedLengthObject() {}
+
+    DataTypeFixedLengthObject(const DataTypeFixedLengthObject& other) {}
+
+    const char* get_family_name() const override { return 
"DataTypeFixedLengthObject"; }
+
+    TypeIndex get_type_id() const override { return 
TypeIndex::FixedLengthObject; }
+
+    Field get_default() const override { return String(); }
+
+    bool equals(const IDataType& rhs) const override { return typeid(rhs) == 
typeid(*this); }
+
+    int64_t get_uncompressed_serialized_bytes(const IColumn& column) const 
override {
+        return static_cast<const ColumnType&>(column).byte_size() + 
sizeof(uint32_t) +
+               sizeof(size_t);
+    }
+
+    char* serialize(const IColumn& column, char* buf) const override;
+    const char* deserialize(const char* buf, IColumn* column) const override;
+
+    MutableColumnPtr create_column() const override;
+
+    bool get_is_parametric() const override { return false; }
+    bool have_subtypes() const override { return false; }
+
+    bool is_categorial() const override { return 
is_value_represented_by_integer(); }
+    bool can_be_inside_low_cardinality() const override { return false; }
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index d43ae88e8b..e31f673643 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -103,6 +103,9 @@ AggregationNode::AggregationNode(ObjectPool* pool, const 
TPlanNode& tnode,
     }
 
     _is_first_phase = tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase;
+    _use_fixed_length_serialization_opt =
+            tnode.agg_node.__isset.use_fixed_length_serialization_opt &&
+            tnode.agg_node.use_fixed_length_serialization_opt;
 }
 
 AggregationNode::~AggregationNode() = default;
@@ -275,11 +278,14 @@ Status AggregationNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
-    _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTimer");
+    _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
     _exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
     _merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
     _expr_timer = ADD_TIMER(runtime_profile(), "ExprTime");
     _get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
+    _serialize_data_timer = ADD_TIMER(runtime_profile(), "SerializeDataTime");
+    _deserialize_data_timer = ADD_TIMER(runtime_profile(), 
"DeserializeDataTime");
+
     _data_mem_tracker = std::make_unique<MemTracker>("AggregationNode:Data");
     _intermediate_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
     _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
@@ -565,20 +571,34 @@ Status 
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
 
     MutableColumns value_columns(agg_size);
     std::vector<DataTypePtr> data_types(agg_size);
-
     // will serialize data to string column
-    std::vector<VectorBufferWriter> value_buffer_writers;
-    auto serialize_string_type = std::make_shared<DataTypeString>();
-    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-        data_types[i] = serialize_string_type;
-        value_columns[i] = serialize_string_type->create_column();
-        
value_buffer_writers.emplace_back(*reinterpret_cast<ColumnString*>(value_columns[i].get()));
-    }
+    if (_use_fixed_length_serialization_opt) {
+        auto serialize_string_type = std::make_shared<DataTypeString>();
+        for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+            data_types[i] = 
_aggregate_evaluators[i]->function()->get_serialized_type();
+            value_columns[i] = 
_aggregate_evaluators[i]->function()->create_serialize_column();
+        }
 
-    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-        _aggregate_evaluators[i]->function()->serialize(
-                _agg_data.without_key + _offsets_of_aggregate_states[i], 
value_buffer_writers[i]);
-        value_buffer_writers[i].commit();
+        for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+            
_aggregate_evaluators[i]->function()->serialize_without_key_to_column(
+                    _agg_data.without_key + _offsets_of_aggregate_states[i], 
value_columns[i]);
+        }
+    } else {
+        std::vector<VectorBufferWriter> value_buffer_writers;
+        auto serialize_string_type = std::make_shared<DataTypeString>();
+        for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+            data_types[i] = serialize_string_type;
+            value_columns[i] = serialize_string_type->create_column();
+            value_buffer_writers.emplace_back(
+                    *reinterpret_cast<ColumnString*>(value_columns[i].get()));
+        }
+
+        for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+            _aggregate_evaluators[i]->function()->serialize(
+                    _agg_data.without_key + _offsets_of_aggregate_states[i],
+                    value_buffer_writers[i]);
+            value_buffer_writers[i].commit();
+        }
     }
     {
         ColumnsWithTypeAndName data_with_schema;
@@ -607,8 +627,6 @@ Status AggregationNode::_execute_without_key(Block* block) {
 Status AggregationNode::_merge_without_key(Block* block) {
     SCOPED_TIMER(_merge_timer);
     DCHECK(_agg_data.without_key != nullptr);
-    std::unique_ptr<char[]> deserialize_buffer(new 
char[_total_size_of_aggregate_states]);
-    int rows = block->rows();
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
         if (_aggregate_evaluators[i]->is_merge()) {
             int col_id = _get_slot_column_id(_aggregate_evaluators[i]);
@@ -617,12 +635,21 @@ Status AggregationNode::_merge_without_key(Block* block) {
                 column = 
((ColumnNullable*)column.get())->get_nested_column_ptr();
             }
 
-            for (int j = 0; j < rows; ++j) {
-                VectorBufferReader 
buffer_reader(((ColumnString*)(column.get()))->get_data_at(j));
-
-                _aggregate_evaluators[i]->function()->deserialize_and_merge(
-                        _agg_data.without_key + 
_offsets_of_aggregate_states[i], buffer_reader,
+            SCOPED_TIMER(_deserialize_data_timer);
+            if (_use_fixed_length_serialization_opt) {
+                
_aggregate_evaluators[i]->function()->deserialize_and_merge_from_column(
+                        _agg_data.without_key + 
_offsets_of_aggregate_states[i], *column,
                         &_agg_arena_pool);
+            } else {
+                const int rows = block->rows();
+                for (int j = 0; j < rows; ++j) {
+                    VectorBufferReader buffer_reader(
+                            ((ColumnString*)(column.get()))->get_data_at(j));
+
+                    
_aggregate_evaluators[i]->function()->deserialize_and_merge(
+                            _agg_data.without_key + 
_offsets_of_aggregate_states[i], buffer_reader,
+                            &_agg_arena_pool);
+                }
             }
         } else {
             _aggregate_evaluators[i]->execute_single_add(
@@ -872,53 +899,59 @@ Status 
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
                     // do not try to do agg, just init and serialize directly 
return the out_block
                     if (!_should_expand_preagg_hash_tables()) {
                         ret_flag = true;
-                        if (_streaming_pre_places.size() < rows) {
-                            _streaming_pre_places.reserve(rows);
-                            for (size_t i = _streaming_pre_places.size(); i < 
rows; ++i) {
-                                
_streaming_pre_places.emplace_back(_agg_arena_pool.aligned_alloc(
-                                        _total_size_of_aggregate_states, 
_align_aggregate_states));
-                            }
-                        }
-
-                        for (size_t i = 0; i < rows; ++i) {
-                            _create_agg_status(_streaming_pre_places[i]);
-                        }
-
-                        for (int i = 0; i < _aggregate_evaluators.size(); ++i) 
{
-                            _aggregate_evaluators[i]->execute_batch_add(
-                                    in_block, _offsets_of_aggregate_states[i],
-                                    _streaming_pre_places.data(), 
&_agg_arena_pool, false);
-                        }
 
                         // will serialize value data to string column
-                        std::vector<VectorBufferWriter> value_buffer_writers;
                         bool mem_reuse = out_block->mem_reuse();
-                        auto serialize_string_type = 
std::make_shared<DataTypeString>();
+
+                        std::vector<DataTypePtr> data_types;
                         MutableColumns value_columns;
-                        for (int i = 0; i < _aggregate_evaluators.size(); ++i) 
{
-                            if (mem_reuse) {
-                                value_columns.emplace_back(
-                                        
std::move(*out_block->get_by_position(i + key_size).column)
-                                                .mutate());
-                            } else {
-                                // slot type of value it should always be 
string type
-                                
value_columns.emplace_back(serialize_string_type->create_column());
+                        if (_use_fixed_length_serialization_opt) {
+                            for (int i = 0; i < _aggregate_evaluators.size(); 
++i) {
+                                auto data_type =
+                                        
_aggregate_evaluators[i]->function()->get_serialized_type();
+                                if (mem_reuse) {
+                                    value_columns.emplace_back(
+                                            
std::move(*out_block->get_by_position(i + key_size)
+                                                               .column)
+                                                    .mutate());
+                                } else {
+                                    // slot type of value it should always be 
string type
+                                    
value_columns.emplace_back(_aggregate_evaluators[i]
+                                                                       
->function()
+                                                                       
->create_serialize_column());
+                                }
+                                data_types.emplace_back(data_type);
                             }
-                            value_buffer_writers.emplace_back(
-                                    
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
-                        }
 
-                        for (size_t j = 0; j < rows; ++j) {
-                            for (size_t i = 0; i < 
_aggregate_evaluators.size(); ++i) {
-                                
_aggregate_evaluators[i]->function()->serialize(
-                                        _streaming_pre_places[j] + 
_offsets_of_aggregate_states[i],
-                                        value_buffer_writers[i]);
-                                value_buffer_writers[i].commit();
+                            for (int i = 0; i != _aggregate_evaluators.size(); 
++i) {
+                                SCOPED_TIMER(_serialize_data_timer);
+                                
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
+                                        in_block, value_columns[i], rows, 
&_agg_arena_pool);
+                            }
+                        } else {
+                            std::vector<VectorBufferWriter> 
value_buffer_writers;
+                            auto serialize_string_type = 
std::make_shared<DataTypeString>();
+                            for (int i = 0; i < _aggregate_evaluators.size(); 
++i) {
+                                if (mem_reuse) {
+                                    value_columns.emplace_back(
+                                            
std::move(*out_block->get_by_position(i + key_size)
+                                                               .column)
+                                                    .mutate());
+                                } else {
+                                    // slot type of value it should always be 
string type
+                                    value_columns.emplace_back(
+                                            
serialize_string_type->create_column());
+                                }
+                                data_types.emplace_back(serialize_string_type);
+                                value_buffer_writers.emplace_back(
+                                        
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
                             }
-                        }
 
-                        for (size_t i = 0; i < rows; ++i) {
-                            _destroy_agg_status(_streaming_pre_places[i]);
+                            for (int i = 0; i != _aggregate_evaluators.size(); 
++i) {
+                                SCOPED_TIMER(_serialize_data_timer);
+                                
_aggregate_evaluators[i]->streaming_agg_serialize(
+                                        in_block, value_buffer_writers[i], 
rows, &_agg_arena_pool);
+                            }
                         }
 
                         if (!mem_reuse) {
@@ -931,7 +964,7 @@ Status 
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
                             }
                             for (int i = 0; i < value_columns.size(); ++i) {
                                 
columns_with_schema.emplace_back(std::move(value_columns[i]),
-                                                                 
serialize_string_type, "");
+                                                                 
data_types[i], "");
                             }
                             out_block->swap(Block(columns_with_schema));
                         } else {
@@ -1073,19 +1106,6 @@ Status 
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
         }
     }
 
-    // will serialize data to string column
-    std::vector<VectorBufferWriter> value_buffer_writers;
-    auto serialize_string_type = std::make_shared<DataTypeString>();
-    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-        value_data_types[i] = serialize_string_type;
-        if (mem_reuse) {
-            value_columns[i] = std::move(*block->get_by_position(i + 
key_size).column).mutate();
-        } else {
-            value_columns[i] = serialize_string_type->create_column();
-        }
-        
value_buffer_writers.emplace_back(*reinterpret_cast<ColumnString*>(value_columns[i].get()));
-    }
-
     std::visit(
             [&](auto&& agg_method) -> void {
                 agg_method.init_once();
@@ -1095,7 +1115,7 @@ Status 
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
                 const auto size = std::min(data.size(), 
size_t(state->batch_size()));
                 using KeyType = std::decay_t<decltype(iter->get_first())>;
                 std::vector<KeyType> keys(size);
-                std::vector<AggregateDataPtr> values(size);
+                std::vector<AggregateDataPtr> values(size + 1);
 
                 size_t num_rows = 0;
                 while (iter != data.end() && num_rows < state->batch_size()) {
@@ -1107,31 +1127,60 @@ Status 
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
 
                 agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows, _probe_key_sz);
 
-                for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
-                    _aggregate_evaluators[i]->function()->serialize_vec(
-                            values, _offsets_of_aggregate_states[i], 
value_buffer_writers[i],
-                            num_rows);
-                }
-
                 if (iter == data.end()) {
                     if (agg_method.data.has_null_key_data()) {
                         DCHECK(key_columns.size() == 1);
                         DCHECK(key_columns[0]->is_nullable());
                         if (agg_method.data.has_null_key_data()) {
                             key_columns[0]->insert_data(nullptr, 0);
-                            auto mapped = agg_method.data.get_null_key_data();
-                            for (size_t i = 0; i < 
_aggregate_evaluators.size(); ++i) {
-                                
_aggregate_evaluators[i]->function()->serialize(
-                                        mapped + 
_offsets_of_aggregate_states[i],
-                                        value_buffer_writers[i]);
-                                value_buffer_writers[i].commit();
-                            }
+                            values[num_rows] = 
agg_method.data.get_null_key_data();
+                            ++num_rows;
                             *eos = true;
                         }
                     } else {
                         *eos = true;
                     }
                 }
+
+                if (_use_fixed_length_serialization_opt) {
+                    SCOPED_TIMER(_serialize_data_timer);
+                    for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+                        value_data_types[i] =
+                                
_aggregate_evaluators[i]->function()->get_serialized_type();
+                        if (mem_reuse) {
+                            value_columns[i] =
+                                    std::move(*block->get_by_position(i + 
key_size).column)
+                                            .mutate();
+                        } else {
+                            value_columns[i] =
+                                    
_aggregate_evaluators[i]->function()->create_serialize_column();
+                        }
+                        
_aggregate_evaluators[i]->function()->serialize_to_column(
+                                values, _offsets_of_aggregate_states[i], 
value_columns[i],
+                                num_rows);
+                    }
+                } else {
+                    SCOPED_TIMER(_serialize_data_timer);
+                    std::vector<VectorBufferWriter> value_buffer_writers;
+                    auto serialize_string_type = 
std::make_shared<DataTypeString>();
+                    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+                        value_data_types[i] = serialize_string_type;
+                        if (mem_reuse) {
+                            value_columns[i] =
+                                    std::move(*block->get_by_position(i + 
key_size).column)
+                                            .mutate();
+                        } else {
+                            value_columns[i] = 
serialize_string_type->create_column();
+                        }
+                        value_buffer_writers.emplace_back(
+                                
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
+                    }
+                    for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+                        _aggregate_evaluators[i]->function()->serialize_vec(
+                                values, _offsets_of_aggregate_states[i], 
value_buffer_writers[i],
+                                num_rows);
+                    }
+                }
             },
             _agg_data._aggregated_method_variant);
 
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index 6960c89c4f..687739dd07 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -649,6 +649,7 @@ private:
     bool _needs_finalize;
     bool _is_merge;
     bool _is_first_phase;
+    bool _use_fixed_length_serialization_opt;
     std::unique_ptr<MemPool> _mem_pool;
 
     std::unique_ptr<MemTracker> _data_mem_tracker;
@@ -669,11 +670,12 @@ private:
     RuntimeProfile::Counter* _merge_timer;
     RuntimeProfile::Counter* _expr_timer;
     RuntimeProfile::Counter* _get_results_timer;
+    RuntimeProfile::Counter* _serialize_data_timer;
+    RuntimeProfile::Counter* _deserialize_data_timer;
 
     bool _is_streaming_preagg;
     Block _preagg_block = Block();
     bool _should_expand_hash_table = true;
-    std::vector<char*> _streaming_pre_places;
 
     bool _should_limit_output = false;
     bool _reach_limit = false;
@@ -802,13 +804,23 @@ private:
                     std::unique_ptr<char[]> deserialize_buffer(
                             new 
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
 
-                    _aggregate_evaluators[i]->function()->deserialize_vec(
-                            deserialize_buffer.get(), 
(ColumnString*)(column.get()),
-                            &_agg_arena_pool, rows);
+                    if (_use_fixed_length_serialization_opt) {
+                        SCOPED_TIMER(_deserialize_data_timer);
+                        
_aggregate_evaluators[i]->function()->deserialize_from_column(
+                                deserialize_buffer.get(), *column, 
&_agg_arena_pool, rows);
+                    } else {
+                        SCOPED_TIMER(_deserialize_data_timer);
+                        _aggregate_evaluators[i]->function()->deserialize_vec(
+                                deserialize_buffer.get(), 
(ColumnString*)(column.get()),
+                                &_agg_arena_pool, rows);
+                    }
                     _aggregate_evaluators[i]->function()->merge_vec_selected(
                             places.data(), _offsets_of_aggregate_states[i],
                             deserialize_buffer.get(), &_agg_arena_pool, rows);
 
+                    
_aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(),
+                                                                      rows);
+
                 } else {
                     _aggregate_evaluators[i]->execute_batch_add_selected(
                             block, _offsets_of_aggregate_states[i], 
places.data(),
@@ -829,13 +841,23 @@ private:
                     std::unique_ptr<char[]> deserialize_buffer(
                             new 
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
 
-                    _aggregate_evaluators[i]->function()->deserialize_vec(
-                            deserialize_buffer.get(), 
(ColumnString*)(column.get()),
-                            &_agg_arena_pool, rows);
+                    if (_use_fixed_length_serialization_opt) {
+                        SCOPED_TIMER(_deserialize_data_timer);
+                        
_aggregate_evaluators[i]->function()->deserialize_from_column(
+                                deserialize_buffer.get(), *column, 
&_agg_arena_pool, rows);
+                    } else {
+                        SCOPED_TIMER(_deserialize_data_timer);
+                        _aggregate_evaluators[i]->function()->deserialize_vec(
+                                deserialize_buffer.get(), 
(ColumnString*)(column.get()),
+                                &_agg_arena_pool, rows);
+                    }
                     _aggregate_evaluators[i]->function()->merge_vec(
                             places.data(), _offsets_of_aggregate_states[i],
                             deserialize_buffer.get(), &_agg_arena_pool, rows);
 
+                    
_aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(),
+                                                                      rows);
+
                 } else {
                     _aggregate_evaluators[i]->execute_batch_add(block,
                                                                 
_offsets_of_aggregate_states[i],
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp 
b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 2631165777..28433f7b85 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -163,6 +163,20 @@ void AggFnEvaluator::execute_batch_add_selected(Block* 
block, size_t offset,
     _function->add_batch_selected(block->rows(), places, offset, 
_agg_columns.data(), arena);
 }
 
+void AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf,
+                                             const size_t num_rows, Arena* 
arena) {
+    _calc_argment_columns(block);
+    SCOPED_TIMER(_exec_timer);
+    _function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows, 
arena);
+}
+
+void AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, 
MutableColumnPtr& dst,
+                                                       const size_t num_rows, 
Arena* arena) {
+    _calc_argment_columns(block);
+    SCOPED_TIMER(_exec_timer);
+    _function->streaming_agg_serialize_to_column(_agg_columns.data(), dst, 
num_rows, arena);
+}
+
 void AggFnEvaluator::insert_result_info(AggregateDataPtr place, IColumn* 
column) {
     _function->insert_result_into(place, *column);
 }
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h 
b/be/src/vec/exprs/vectorized_agg_fn.h
index 2185de7617..fa025edec1 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -61,6 +61,12 @@ public:
     void execute_batch_add_selected(Block* block, size_t offset, 
AggregateDataPtr* places,
                                     Arena* arena = nullptr);
 
+    void streaming_agg_serialize(Block* block, BufferWritable& buf, const 
size_t num_rows,
+                                 Arena* arena);
+
+    void streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
+                                           const size_t num_rows, Arena* 
arena);
+
     void insert_result_info(AggregateDataPtr place, IColumn* column);
 
     void insert_result_info_vec(const std::vector<AggregateDataPtr>& place, 
size_t offset,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index c8c89a6fa2..aca046c7a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -273,6 +273,7 @@ public class AggregationNode extends PlanNode {
         msg.agg_node.setAggSortInfos(aggSortInfos);
         msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
         msg.agg_node.setIsFirstPhase(aggInfo.isFirstPhase());
+        msg.agg_node.setUseFixedLengthSerializationOpt(true);
         List<Expr> groupingExprs = aggInfo.getGroupingExprs();
         if (groupingExprs != null) {
             msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto
index c5f47d2ef9..2cb44154cf 100644
--- a/gensrc/proto/types.proto
+++ b/gensrc/proto/types.proto
@@ -101,6 +101,7 @@ message PGenericType {
         NOTHING = 27;
         DATEV2 = 28;
         DATETIMEV2 = 29;
+        FIXEDLENGTHOBJECT = 30;
         UNKNOWN = 999;
     }
     required TypeId id = 2;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index bbe1e00c20..b214eeb496 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -561,7 +561,8 @@ struct TAggregationNode {
   5: required bool need_finalize
   6: optional bool use_streaming_preaggregation
   7: optional list<TSortInfo> agg_sort_infos
-  8: optional bool is_first_phase;
+  8: optional bool is_first_phase
+  9: optional bool use_fixed_length_serialization_opt
 }
 
 struct TRepeatNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to