This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dc8f64b3e3 [improvement](agg) Serialize the fixed-length aggregation results with corresponding columns instead of ColumnString (#11801) dc8f64b3e3 is described below commit dc8f64b3e3563c41857e30485dbd4d68ea9da7ab Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Mon Aug 22 10:12:06 2022 +0800 [improvement](agg) Serialize the fixed-length aggregation results with corresponding columns instead of ColumnString (#11801) --- be/src/vec/CMakeLists.txt | 1 + .../vec/aggregate_functions/aggregate_function.h | 84 +++++++- .../aggregate_functions/aggregate_function_avg.h | 72 ++++++- .../aggregate_functions/aggregate_function_count.h | 96 +++++++++ .../aggregate_function_min_max.h | 121 ++++++++++- .../aggregate_function_nothing.h | 3 + .../aggregate_functions/aggregate_function_null.h | 10 + .../aggregate_functions/aggregate_function_sum.h | 53 +++++ be/src/vec/columns/column_fixed_length_object.h | 197 ++++++++++++++++++ be/src/vec/core/field.h | 12 ++ be/src/vec/core/types.h | 3 + be/src/vec/data_types/data_type.cpp | 2 + be/src/vec/data_types/data_type_factory.cpp | 3 + be/src/vec/data_types/data_type_factory.hpp | 1 + .../data_types/data_type_fixed_length_object.cpp | 67 +++++++ .../vec/data_types/data_type_fixed_length_object.h | 59 ++++++ be/src/vec/exec/vaggregation_node.cpp | 223 +++++++++++++-------- be/src/vec/exec/vaggregation_node.h | 36 +++- be/src/vec/exprs/vectorized_agg_fn.cpp | 14 ++ be/src/vec/exprs/vectorized_agg_fn.h | 6 + .../org/apache/doris/planner/AggregationNode.java | 1 + gensrc/proto/types.proto | 1 + gensrc/thrift/PlanNodes.thrift | 3 +- 23 files changed, 969 insertions(+), 99 deletions(-) diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index bc37c49310..8743013590 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -66,6 +66,7 @@ set(VEC_FILES data_types/data_type_array.cpp data_types/data_type_bitmap.cpp data_types/data_type_factory.cpp + data_types/data_type_fixed_length_object.cpp data_types/data_type_hll.cpp data_types/data_type_nothing.cpp data_types/data_type_nothing.cpp diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index ae9a3667e5..6309b0c994 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -28,6 +28,7 @@ #include "vec/core/column_numbers.h" #include "vec/core/field.h" #include "vec/core/types.h" +#include "vec/data_types/data_type_string.h" namespace doris::vectorized { @@ -76,6 +77,9 @@ public: /// Delete data for aggregation. virtual void destroy(AggregateDataPtr __restrict place) const noexcept = 0; + virtual void destroy_vec(AggregateDataPtr __restrict place, + const size_t num_rows) const noexcept = 0; + /// Reset aggregation state virtual void reset(AggregateDataPtr place) const = 0; @@ -117,17 +121,29 @@ public: virtual void serialize_vec(const std::vector<AggregateDataPtr>& places, size_t offset, BufferWritable& buf, const size_t num_rows) const = 0; + virtual void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, + MutableColumnPtr& dst, const size_t num_rows) const = 0; + + virtual void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, + MutableColumnPtr& dst) const = 0; + /// Deserializes state. This function is called only for empty (just created) states. virtual void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, Arena* arena) const = 0; - virtual void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena* arena, + virtual void deserialize_vec(AggregateDataPtr places, const ColumnString* column, Arena* arena, size_t num_rows) const = 0; + virtual void deserialize_from_column(AggregateDataPtr places, const IColumn& column, + Arena* arena, size_t num_rows) const = 0; + /// Deserializes state and merge it with current aggregation function. virtual void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, Arena* arena) const = 0; + virtual void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, + const IColumn& column, Arena* arena) const = 0; + /// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()). virtual bool allocates_memory_in_arena() const { return false; } @@ -169,9 +185,19 @@ public: AggregateDataPtr place, const IColumn** columns, Arena* arena) const = 0; + virtual void streaming_agg_serialize(const IColumn** columns, BufferWritable& buf, + const size_t num_rows, Arena* arena) const = 0; + + virtual void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) const = 0; + const DataTypes& get_argument_types() const { return argument_types; } const Array& get_parameters() const { return parameters; } + virtual MutableColumnPtr create_serialize_column() const { return ColumnString::create(); } + + virtual DataTypePtr get_serialized_type() const { return std::make_shared<DataTypeString>(); } + protected: DataTypes argument_types; Array parameters; @@ -184,6 +210,14 @@ public: IAggregateFunctionHelper(const DataTypes& argument_types_, const Array& parameters_) : IAggregateFunction(argument_types_, parameters_) {} + void destroy_vec(AggregateDataPtr __restrict place, + const size_t num_rows) const noexcept override { + const size_t size_of_data_ = size_of_data(); + for (size_t i = 0; i != num_rows; ++i) { + static_cast<const Derived*>(this)->destroy(place + size_of_data_ * i); + } + } + void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset, const IColumn** columns, Arena* arena, bool agg_many) const override { if constexpr (std::is_same_v<Derived, AggregateFunctionBitmapCount<false, ColumnBitmap>> || @@ -266,7 +300,38 @@ public: } } - void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena* arena, + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, + MutableColumnPtr& dst, const size_t num_rows) const override { + VectorBufferWriter writter(assert_cast<ColumnString&>(*dst)); + serialize_vec(places, offset, writter, num_rows); + } + + void streaming_agg_serialize(const IColumn** columns, BufferWritable& buf, + const size_t num_rows, Arena* arena) const override { + char place[size_of_data()]; + for (size_t i = 0; i != num_rows; ++i) { + static_cast<const Derived*>(this)->create(place); + static_cast<const Derived*>(this)->add(place, columns, i, arena); + static_cast<const Derived*>(this)->serialize(place, buf); + buf.commit(); + static_cast<const Derived*>(this)->destroy(place); + } + } + + void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) const override { + VectorBufferWriter writter(static_cast<ColumnString&>(*dst)); + streaming_agg_serialize(columns, writter, num_rows, arena); + } + + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, + MutableColumnPtr& dst) const override { + VectorBufferWriter writter(static_cast<ColumnString&>(*dst)); + static_cast<const Derived*>(this)->serialize(place, writter); + writter.commit(); + } + + void deserialize_vec(AggregateDataPtr places, const ColumnString* column, Arena* arena, size_t num_rows) const override { const auto size_of_data = static_cast<const Derived*>(this)->size_of_data(); for (size_t i = 0; i != num_rows; ++i) { @@ -277,6 +342,11 @@ public: } } + void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, + size_t num_rows) const override { + deserialize_vec(places, assert_cast<const ColumnString*>(&column), arena, num_rows); + } + void merge_vec(const AggregateDataPtr* places, size_t offset, ConstAggregateDataPtr rhs, Arena* arena, const size_t num_rows) const override { const auto size_of_data = static_cast<const Derived*>(this)->size_of_data(); @@ -338,6 +408,16 @@ public: derived->merge(place, deserialized_place, arena); derived->destroy(deserialized_place); } + + void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, + Arena* arena) const override { + size_t num_rows = column.size(); + for (size_t i = 0; i != num_rows; ++i) { + VectorBufferReader buffer_reader( + (assert_cast<const ColumnString&>(column)).get_data_at(i)); + deserialize_and_merge(place, buffer_reader, arena); + } + } }; using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>; diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h b/be/src/vec/aggregate_functions/aggregate_function_avg.h index eab938e4c1..c80c46b8f9 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_avg.h +++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h @@ -22,8 +22,9 @@ #include "common/status.h" #include "vec/aggregate_functions/aggregate_function.h" -#include "vec/columns/columns_number.h" +#include "vec/columns/column_fixed_length_object.h" #include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_fixed_length_object.h" #include "vec/data_types/data_type_number.h" #include "vec/io/io_helper.h" @@ -34,6 +35,12 @@ struct AggregateFunctionAvgData { T sum = 0; UInt64 count = 0; + AggregateFunctionAvgData& operator=(const AggregateFunctionAvgData<T>& src) { + sum = src.sum; + count = src.count; + return *this; + } + template <typename ResultT> ResultT result() const { if constexpr (std::is_floating_point_v<ResultT>) { @@ -136,6 +143,69 @@ public: column.get_data().push_back(this->data(place).template result<ResultType>()); } + void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, + size_t num_rows) const override { + auto& col = assert_cast<const ColumnFixedLengthObject&>(column); + DCHECK(col.size() >= num_rows) << "source column's size should greater than num_rows"; + auto* data = col.get_data().data(); + memcpy(places, data, sizeof(Data) * num_rows); + } + + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, + MutableColumnPtr& dst, const size_t num_rows) const override { + auto& col = assert_cast<ColumnFixedLengthObject&>(*dst); + col.set_item_size(sizeof(Data)); + col.resize(num_rows); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + *reinterpret_cast<Data*>(&data[sizeof(Data) * i]) = + *reinterpret_cast<Data*>(places[i] + offset); + } + } + + void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) const override { + auto* src_data = assert_cast<const ColVecType&>(*columns[0]).get_data().data(); + auto& dst_col = static_cast<ColumnFixedLengthObject&>(*dst); + dst_col.set_item_size(sizeof(Data)); + dst_col.resize(num_rows); + auto* data = dst_col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + auto& state = *reinterpret_cast<Data*>(&data[sizeof(Data) * i]); + state.sum = src_data[i]; + state.count = 1; + } + } + + void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, + Arena* arena) const override { + auto& col = assert_cast<const ColumnFixedLengthObject&>(column); + const size_t num_rows = column.size(); + DCHECK(col.size() >= num_rows) << "source column's size should greater than num_rows"; + auto* data = reinterpret_cast<const Data*>(col.get_data().data()); + + for (size_t i = 0; i != num_rows; ++i) { + this->data(place).sum += data[i].sum; + this->data(place).count += data[i].count; + } + } + + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, + MutableColumnPtr& dst) const override { + auto& col = assert_cast<ColumnFixedLengthObject&>(*dst); + col.set_item_size(sizeof(Data)); + col.resize(1); + *reinterpret_cast<Data*>(col.get_data().data()) = this->data(place); + } + + MutableColumnPtr create_serialize_column() const override { + return ColumnFixedLengthObject::create(sizeof(Data)); + } + + DataTypePtr get_serialized_type() const override { + return std::make_shared<DataTypeFixedLengthObject>(); + } + private: UInt32 scale; }; diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h b/be/src/vec/aggregate_functions/aggregate_function_count.h index e2ba3768e7..8ab6b53e56 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_count.h +++ b/be/src/vec/aggregate_functions/aggregate_function_count.h @@ -71,6 +71,54 @@ public: void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).count); } + + void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, + size_t num_rows) const override { + auto data = assert_cast<const ColumnUInt64&>(column).get_data().data(); + auto* dst_data = reinterpret_cast<Data*>(places); + for (size_t i = 0; i != num_rows; ++i) { + dst_data[i].count = data[i]; + } + } + + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, + MutableColumnPtr& dst, const size_t num_rows) const override { + auto& col = assert_cast<ColumnUInt64&>(*dst); + col.resize(num_rows); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + data[i] = this->data(places[i] + offset).count; + } + } + + void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) const override { + auto& col = assert_cast<ColumnUInt64&>(*dst); + col.resize(num_rows); + col.get_data().assign(num_rows, 1UL); + } + + void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, + Arena* arena) const override { + auto data = assert_cast<const ColumnUInt64&>(column).get_data().data(); + const size_t num_rows = column.size(); + for (size_t i = 0; i != num_rows; ++i) { + this->data(place).count += data[i]; + } + } + + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, + MutableColumnPtr& dst) const override { + auto& col = assert_cast<ColumnUInt64&>(*dst); + col.resize(1); + reinterpret_cast<Data*>(col.get_data().data())->count = this->data(place).count; + } + + MutableColumnPtr create_serialize_column() const override { + return ColumnVector<UInt64>::create(); + } + + DataTypePtr get_serialized_type() const override { return std::make_shared<DataTypeUInt64>(); } }; /// Simply count number of not-NULL values. @@ -117,6 +165,54 @@ public: assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).count); } } + + void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, + size_t num_rows) const override { + auto data = assert_cast<const ColumnUInt64&>(column).get_data().data(); + auto* dst_data = reinterpret_cast<Data*>(places); + for (size_t i = 0; i != num_rows; ++i) { + dst_data[i].count = data[i]; + } + } + + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, + MutableColumnPtr& dst, const size_t num_rows) const override { + auto& col = assert_cast<ColumnUInt64&>(*dst); + col.resize(num_rows); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + data[i] = this->data(places[i] + offset).count; + } + } + + void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) const override { + auto& col = assert_cast<ColumnUInt64&>(*dst); + col.resize(num_rows); + col.get_data().assign(num_rows, 1UL); + } + + void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, + Arena* arena) const override { + auto data = assert_cast<const ColumnUInt64&>(column).get_data().data(); + const size_t num_rows = column.size(); + for (size_t i = 0; i != num_rows; ++i) { + this->data(place).count += data[i]; + } + } + + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, + MutableColumnPtr& dst) const override { + auto& col = assert_cast<ColumnUInt64&>(*dst); + col.resize(1); + reinterpret_cast<Data*>(col.get_data().data())->count = this->data(place).count; + } + + MutableColumnPtr create_serialize_column() const override { + return ColumnVector<UInt64>::create(); + } + + DataTypePtr get_serialized_type() const override { return std::make_shared<DataTypeUInt64>(); } }; } // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h b/be/src/vec/aggregate_functions/aggregate_function_min_max.h index ee91c0bbb9..8fb36cddee 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h +++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h @@ -23,8 +23,10 @@ #include "common/logging.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/columns/column_decimal.h" +#include "vec/columns/column_fixed_length_object.h" #include "vec/columns/column_vector.h" #include "vec/common/assert_cast.h" +#include "vec/data_types/data_type_fixed_length_object.h" #include "vec/io/io_helper.h" namespace doris::vectorized { @@ -40,8 +42,20 @@ private: T value; public: + SingleValueDataFixed() = default; + SingleValueDataFixed(bool has_value_, T value_) : has_value(has_value_), value(value_) {} bool has() const { return has_value; } + constexpr static bool IsFixedLength = true; + using value_type = T; + + value_type get_value() const { return value; } + + void set_value(T value_) { + has_value = true; + value = value_; + } + void insert_result_into(IColumn& to) const { if (has()) { assert_cast<ColumnVector<T>&>(to).get_data().push_back(value); @@ -136,8 +150,20 @@ private: Type value; public: + SingleValueDataDecimal() = default; + SingleValueDataDecimal(bool has_value_, T value_) : has_value(has_value_), value(value_) {} bool has() const { return has_value; } + constexpr static bool IsFixedLength = true; + using value_type = Type; + + value_type get_value() const { return value; } + + void set_value(T value_) { + has_value = true; + value = value_; + } + void insert_result_into(IColumn& to) const { if (has()) { assert_cast<ColumnDecimal<T>&>(to).insert_data((const char*)&value, 0); @@ -242,6 +268,10 @@ private: public: ~SingleValueDataString() { delete[] large_data; } + constexpr static bool IsFixedLength = false; + + using value_type = String; + bool has() const { return size >= 0; } const char* get_data() const { return size <= MAX_SMALL_STRING_SIZE ? small_data : large_data; } @@ -377,8 +407,9 @@ public: }; template <typename Data> -struct AggregateFunctionMaxData : Data { +struct AggregateFunctionMaxData : public Data { using Self = AggregateFunctionMaxData; + using Data::IsFixedLength; bool change_if_better(const IColumn& column, size_t row_num, Arena* arena) { return this->change_if_greater(column, row_num, arena); @@ -393,6 +424,7 @@ struct AggregateFunctionMaxData : Data { template <typename Data> struct AggregateFunctionMinData : Data { using Self = AggregateFunctionMinData; + using Data::IsFixedLength; bool change_if_better(const IColumn& column, size_t row_num, Arena* arena) { return this->change_if_less(column, row_num, arena); @@ -408,6 +440,8 @@ class AggregateFunctionsSingleValue final Data, AggregateFunctionsSingleValue<Data, AllocatesMemoryInArena>> { private: DataTypePtr& type; + using Base = IAggregateFunctionDataHelper< + Data, AggregateFunctionsSingleValue<Data, AllocatesMemoryInArena>>; public: AggregateFunctionsSingleValue(const DataTypePtr& type_) @@ -456,6 +490,91 @@ public: void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { this->data(place).insert_result_into(to); } + + void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, + size_t num_rows) const override { + if constexpr (Data::IsFixedLength) { + const auto& col = static_cast<const ColumnFixedLengthObject&>(column); + auto* column_data = reinterpret_cast<const Data*>(col.get_data().data()); + Data* data = reinterpret_cast<Data*>(places); + for (size_t i = 0; i != num_rows; ++i) { + data[i] = column_data[i]; + } + } else { + Base::deserialize_from_column(places, column, arena, num_rows); + } + } + + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, + MutableColumnPtr& dst, const size_t num_rows) const override { + if constexpr (Data::IsFixedLength) { + auto& dst_column = static_cast<ColumnFixedLengthObject&>(*dst); + dst_column.resize(num_rows); + auto* dst_data = reinterpret_cast<Data*>(dst_column.get_data().data()); + for (size_t i = 0; i != num_rows; ++i) { + dst_data[i] = this->data(places[i] + offset); + } + } else { + Base::serialize_to_column(places, offset, dst, num_rows); + } + } + + void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) const override { + if constexpr (Data::IsFixedLength) { + const auto& src_column = static_cast<const ColumnFixedLengthObject&>(*columns[0]); + auto* src_data = reinterpret_cast<const Data*>(src_column.get_data().data()); + auto& dst_column = static_cast<ColumnFixedLengthObject&>(*dst); + dst_column.resize(num_rows); + auto* dst_data = reinterpret_cast<Data*>(dst_column.get_data().data()); + for (size_t i = 0; i != num_rows; ++i) { + dst_data[i] = src_data[i]; + } + } else { + Base::streaming_agg_serialize_to_column(columns, dst, num_rows, arena); + } + } + + void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, + Arena* arena) const override { + if constexpr (Data::IsFixedLength) { + const auto& col = static_cast<const ColumnFixedLengthObject&>(column); + auto* column_data = reinterpret_cast<const Data*>(col.get_data().data()); + const size_t num_rows = column.size(); + for (size_t i = 0; i != num_rows; ++i) { + this->data(place).change_if_better(column_data[i], arena); + } + } else { + Base::deserialize_and_merge_from_column(place, column, arena); + } + } + + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, + MutableColumnPtr& dst) const override { + if constexpr (Data::IsFixedLength) { + auto& col = assert_cast<ColumnFixedLengthObject&>(*dst); + col.resize(1); + *reinterpret_cast<Data*>(col.get_data().data()) = this->data(place); + } else { + Base::serialize_without_key_to_column(place, dst); + } + } + + MutableColumnPtr create_serialize_column() const override { + if constexpr (Data::IsFixedLength) { + return ColumnFixedLengthObject::create(sizeof(Data)); + } else { + return ColumnString::create(); + } + } + + DataTypePtr get_serialized_type() const override { + if constexpr (Data::IsFixedLength) { + return std::make_shared<DataTypeFixedLengthObject>(); + } else { + return std::make_shared<DataTypeString>(); + } + } }; AggregateFunctionPtr create_aggregate_function_max(const std::string& name, diff --git a/be/src/vec/aggregate_functions/aggregate_function_nothing.h b/be/src/vec/aggregate_functions/aggregate_function_nothing.h index 64af14a6cf..c6b310a9a5 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_nothing.h +++ b/be/src/vec/aggregate_functions/aggregate_function_nothing.h @@ -67,6 +67,9 @@ public: void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, Arena* arena) const override {} + + void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, + Arena* arena) const override {} }; } // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h b/be/src/vec/aggregate_functions/aggregate_function_null.h index 89960bc9f0..7b8489c095 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_null.h +++ b/be/src/vec/aggregate_functions/aggregate_function_null.h @@ -163,6 +163,16 @@ public: } } + void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, + Arena* arena) const override { + size_t num_rows = column.size(); + for (size_t i = 0; i != num_rows; ++i) { + VectorBufferReader buffer_reader( + (assert_cast<const ColumnString&>(column)).get_data_at(i)); + deserialize_and_merge(place, buffer_reader, arena); + } + } + void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { if constexpr (result_is_nullable) { ColumnNullable& to_concrete = assert_cast<ColumnNullable&>(to); diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h b/be/src/vec/aggregate_functions/aggregate_function_sum.h index 9c5bdfc3bf..46ca85f3a9 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sum.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h @@ -101,6 +101,59 @@ public: column.get_data().push_back(this->data(place).get()); } + void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, + size_t num_rows) const override { + auto data = assert_cast<const ColVecResult&>(column).get_data().data(); + auto dst_data = reinterpret_cast<Data*>(places); + for (size_t i = 0; i != num_rows; ++i) { + dst_data[i].sum = data[i]; + } + } + + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, + MutableColumnPtr& dst, const size_t num_rows) const override { + auto& col = assert_cast<ColVecResult&>(*dst); + col.resize(num_rows); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + data[i] = this->data(places[i] + offset).sum; + } + } + + void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) const override { + auto& col = assert_cast<ColVecResult&>(*dst); + auto& src = assert_cast<const ColVecType&>(*columns[0]); + col.resize(num_rows); + auto* src_data = src.get_data().data(); + auto* dst_data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + dst_data[i] = src_data[i]; + } + } + + void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, + Arena* arena) const override { + auto data = assert_cast<const ColVecResult&>(column).get_data().data(); + const size_t num_rows = column.size(); + for (size_t i = 0; i != num_rows; ++i) { + this->data(place).sum += data[i]; + } + } + + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, + MutableColumnPtr& dst) const override { + auto& col = assert_cast<ColVecResult&>(*dst); + col.resize(1); + reinterpret_cast<Data*>(col.get_data().data())->sum = this->data(place).sum; + } + + MutableColumnPtr create_serialize_column() const override { + return get_return_type()->create_column(); + } + + DataTypePtr get_serialized_type() const override { return get_return_type(); } + private: UInt32 scale; }; diff --git a/be/src/vec/columns/column_fixed_length_object.h b/be/src/vec/columns/column_fixed_length_object.h new file mode 100644 index 0000000000..5ecb13ec61 --- /dev/null +++ b/be/src/vec/columns/column_fixed_length_object.h @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column.h" +#include "vec/columns/columns_common.h" +#include "vec/common/arena.h" +#include "vec/common/pod_array.h" + +namespace doris::vectorized { + +class ColumnFixedLengthObject final : public COWHelper<IColumn, ColumnFixedLengthObject> { +private: + using Self = ColumnFixedLengthObject; + friend class COWHelper<IColumn, ColumnFixedLengthObject>; + friend class OlapBlockDataConvertor; + +public: + using Container = PaddedPODArray<uint8_t>; + +private: + ColumnFixedLengthObject() = delete; + ColumnFixedLengthObject(const size_t _item_size_) : _item_size(_item_size_), _item_count(0) {} + ColumnFixedLengthObject(const ColumnFixedLengthObject& src) + : _item_size(src._item_size), + _item_count(src._item_count), + _data(src._data.begin(), src._data.end()) {} + +public: + const char* get_family_name() const override { return "ColumnFixedLengthObject"; } + + size_t size() const override { return _item_count; } + + const Container& get_data() const { return _data; } + + Container& get_data() { return _data; } + + void resize(size_t n) override { + DCHECK(_item_size > 0) << "_item_size should be greater than 0"; + _data.resize(n * _item_size); + _item_count = n; + } + + MutableColumnPtr clone_resized(size_t size) const override { + auto res = this->create(_item_size); + + if (size > 0) { + auto& new_col = static_cast<Self&>(*res); + new_col.resize(size); + auto* new_data = new_col._data.data(); + + size_t count = std::min(this->size(), size); + memcpy(new_data, _data.data(), count * _item_size); + + if (size > count) memset(new_data + count * _item_size, 0, (size - count) * _item_size); + } + + return res; + } + + void insert_indices_from(const IColumn& src, const int* indices_begin, + const int* indices_end) override { + const Self& src_vec = static_cast<const Self&>(src); + auto origin_size = size(); + auto new_size = indices_end - indices_begin; + if (_item_size == 0) { + _item_size = src_vec._item_size; + } + DCHECK(_item_size == src_vec._item_size) << "dst and src should have the same _item_size"; + resize(origin_size + new_size); + + for (int i = 0; i < new_size; ++i) { + int offset = indices_begin[i]; + if (offset > -1) { + memcpy(&_data[(origin_size + i) * _item_size], &src_vec._data[offset * _item_size], + _item_size); + } else { + memset(&_data[(origin_size + i) * _item_size], 0, _item_size); + } + } + } + + void clear() override { + _data.clear(); + _item_count = 0; + } + + [[noreturn]] Field operator[](size_t n) const override { + LOG(FATAL) << "operator[] not supported"; + } + + void get(size_t n, Field& res) const override { LOG(FATAL) << "get not supported"; } + + [[noreturn]] StringRef get_data_at(size_t n) const override { + LOG(FATAL) << "get_data_at not supported"; + } + + void insert(const Field& x) override { LOG(FATAL) << "insert not supported"; } + + void insert_range_from(const IColumn& src, size_t start, size_t length) override { + LOG(FATAL) << "insert_range_from not supported"; + } + + void insert_data(const char* pos, size_t length) override { + LOG(FATAL) << "insert_data not supported"; + } + + void insert_default() override { LOG(FATAL) << "insert_default not supported"; } + + void pop_back(size_t n) override { LOG(FATAL) << "pop_back not supported"; } + + StringRef serialize_value_into_arena(size_t n, Arena& arena, + char const*& begin) const override { + LOG(FATAL) << "serialize_value_into_arena not supported"; + } + + const char* deserialize_and_insert_from_arena(const char* pos) override { + LOG(FATAL) << "deserialize_and_insert_from_arena not supported"; + } + + void update_hash_with_value(size_t n, SipHash& hash) const override { + LOG(FATAL) << "update_hash_with_value not supported"; + } + + [[noreturn]] ColumnPtr filter(const IColumn::Filter& filt, + ssize_t result_size_hint) const override { + LOG(FATAL) << "filter not supported"; + } + + [[noreturn]] ColumnPtr permute(const IColumn::Permutation& perm, size_t limit) const override { + LOG(FATAL) << "permute not supported"; + } + + [[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs, + int nan_direction_hint) const override { + LOG(FATAL) << "compare_at not supported"; + } + + void get_permutation(bool reverse, size_t limit, int nan_direction_hint, + IColumn::Permutation& res) const override { + LOG(FATAL) << "get_permutation not supported"; + } + + [[noreturn]] ColumnPtr replicate(const IColumn::Offsets& offsets) const override { + LOG(FATAL) << "replicate not supported"; + } + + [[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns, + const IColumn::Selector& selector) const override { + LOG(FATAL) << "scatter not supported"; + } + + void get_extremes(Field& min, Field& max) const override { + LOG(FATAL) << "get_extremes not supported"; + } + + size_t byte_size() const override { return _data.size(); } + + size_t item_size() const { return _item_size; } + + void set_item_size(size_t size) { + DCHECK(_item_count == 0 || size == _item_size) + << "cannot reset _item_size of ColumnFixedLengthObject"; + _item_size = size; + } + + size_t allocated_bytes() const override { return _data.allocated_bytes(); } + + void replace_column_data(const IColumn&, size_t row, size_t self_row = 0) override { + LOG(FATAL) << "replace_column_data not supported"; + } + + void replace_column_data_default(size_t self_row = 0) override { + LOG(FATAL) << "replace_column_data_default not supported"; + } + +protected: + size_t _item_size; + size_t _item_count; + Container _data; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/core/field.h b/be/src/vec/core/field.h index cddd7037d7..4c4a275e34 100644 --- a/be/src/vec/core/field.h +++ b/be/src/vec/core/field.h @@ -182,6 +182,7 @@ public: Float64 = 3, UInt128 = 4, Int128 = 5, + FixedLengthObject = 6, /// Non-POD types. @@ -224,6 +225,8 @@ public: return "Decimal128"; case AggregateFunctionState: return "AggregateFunctionState"; + case FixedLengthObject: + return "FixedLengthObject"; } LOG(FATAL) << "Bad type of Field"; @@ -378,6 +381,8 @@ public: return get<DecimalField<Decimal128>>() < rhs.get<DecimalField<Decimal128>>(); case Types::AggregateFunctionState: return get<AggregateFunctionStateData>() < rhs.get<AggregateFunctionStateData>(); + case Types::FixedLengthObject: + break; } LOG(FATAL) << "Bad type of Field"; @@ -417,6 +422,8 @@ public: return get<DecimalField<Decimal128>>() <= rhs.get<DecimalField<Decimal128>>(); case Types::AggregateFunctionState: return get<AggregateFunctionStateData>() <= rhs.get<AggregateFunctionStateData>(); + case Types::FixedLengthObject: + break; } LOG(FATAL) << "Bad type of Field"; return {}; @@ -452,6 +459,8 @@ public: return get<DecimalField<Decimal128>>() == rhs.get<DecimalField<Decimal128>>(); case Types::AggregateFunctionState: return get<AggregateFunctionStateData>() == rhs.get<AggregateFunctionStateData>(); + case Types::FixedLengthObject: + break; } CHECK(false) << "Bad type of Field"; @@ -544,6 +553,9 @@ private: case Types::AggregateFunctionState: f(field.template get<AggregateFunctionStateData>()); return; + case Types::FixedLengthObject: + LOG(FATAL) << "FixedLengthObject not supported"; + break; } } diff --git a/be/src/vec/core/types.h b/be/src/vec/core/types.h index def50d5e01..f57f41d58a 100644 --- a/be/src/vec/core/types.h +++ b/be/src/vec/core/types.h @@ -77,6 +77,7 @@ enum class TypeIndex { DateV2, DateTimeV2, TimeV2, + FixedLengthObject, }; struct Consted { @@ -448,6 +449,8 @@ inline const char* getTypeName(TypeIndex idx) { return TypeName<BitmapValue>::get(); case TypeIndex::HLL: return TypeName<HyperLogLog>::get(); + case TypeIndex::FixedLengthObject: + return "FixedLengthObject"; } __builtin_unreachable(); diff --git a/be/src/vec/data_types/data_type.cpp b/be/src/vec/data_types/data_type.cpp index f04bd0c592..f5a771131a 100644 --- a/be/src/vec/data_types/data_type.cpp +++ b/be/src/vec/data_types/data_type.cpp @@ -143,6 +143,8 @@ PGenericType_TypeId IDataType::get_pdata_type(const IDataType* data_type) { return PGenericType::HLL; case TypeIndex::Array: return PGenericType::LIST; + case TypeIndex::FixedLengthObject: + return PGenericType::FIXEDLENGTHOBJECT; default: return PGenericType::UNKNOWN; } diff --git a/be/src/vec/data_types/data_type_factory.cpp b/be/src/vec/data_types/data_type_factory.cpp index e2d1e9d9f3..980bd547d7 100644 --- a/be/src/vec/data_types/data_type_factory.cpp +++ b/be/src/vec/data_types/data_type_factory.cpp @@ -280,6 +280,9 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) { DCHECK(pcolumn.children_size() == 1); nested = std::make_shared<DataTypeArray>(create_data_type(pcolumn.children(0))); break; + case PGenericType::FIXEDLENGTHOBJECT: + nested = std::make_shared<DataTypeFixedLengthObject>(); + break; default: { LOG(FATAL) << fmt::format("Unknown data type: {}", pcolumn.type()); return nullptr; diff --git a/be/src/vec/data_types/data_type_factory.hpp b/be/src/vec/data_types/data_type_factory.hpp index 37dda1ee54..73e0a147fe 100644 --- a/be/src/vec/data_types/data_type_factory.hpp +++ b/be/src/vec/data_types/data_type_factory.hpp @@ -34,6 +34,7 @@ #include "vec/data_types/data_type_date.h" #include "vec/data_types/data_type_date_time.h" #include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_fixed_length_object.h" #include "vec/data_types/data_type_nothing.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_number.h" diff --git a/be/src/vec/data_types/data_type_fixed_length_object.cpp b/be/src/vec/data_types/data_type_fixed_length_object.cpp new file mode 100644 index 0000000000..d96945b2cd --- /dev/null +++ b/be/src/vec/data_types/data_type_fixed_length_object.cpp @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/data_types/data_type_fixed_length_object.h" + +#include "vec/aggregate_functions/aggregate_function_avg.h" + +namespace doris::vectorized { + +char* DataTypeFixedLengthObject::serialize(const IColumn& column, char* buf) const { + // row num + const auto row_num = column.size(); + *reinterpret_cast<uint32_t*>(buf) = row_num; + buf += sizeof(uint32_t); + // column data + auto ptr = column.convert_to_full_column_if_const(); + const auto& src_col = assert_cast<const ColumnType&>(*ptr.get()); + DCHECK(src_col.item_size() > 0) + << "[serialize]item size of DataTypeFixedLengthObject should be greater than 0"; + *reinterpret_cast<size_t*>(buf) = src_col.item_size(); + buf += sizeof(size_t); + const auto* origin_data = src_col.get_data().data(); + memcpy(buf, origin_data, row_num * src_col.item_size()); + buf += row_num * src_col.item_size(); + + return buf; +} + +const char* DataTypeFixedLengthObject::deserialize(const char* buf, IColumn* column) const { + // row num + uint32_t row_num = *reinterpret_cast<const uint32_t*>(buf); + buf += sizeof(uint32_t); + size_t item_size = *reinterpret_cast<const size_t*>(buf); + buf += sizeof(size_t); + + DCHECK(item_size > 0) + << "[deserialize]item size of DataTypeFixedLengthObject should be greater than 0"; + + auto& dst_col = static_cast<ColumnType&>(*column); + dst_col.set_item_size(item_size); + // column data + dst_col.resize(row_num); + memcpy(dst_col.get_data().data(), buf, row_num * item_size); + buf += row_num * item_size; + + return buf; +} + +MutableColumnPtr DataTypeFixedLengthObject::create_column() const { + return ColumnType::create(0); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/data_types/data_type_fixed_length_object.h b/be/src/vec/data_types/data_type_fixed_length_object.h new file mode 100644 index 0000000000..5d346b297d --- /dev/null +++ b/be/src/vec/data_types/data_type_fixed_length_object.h @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_fixed_length_object.h" +#include "vec/common/typeid_cast.h" +#include "vec/data_types/data_type.h" + +namespace doris::vectorized { + +class DataTypeFixedLengthObject final : public IDataType { +public: + using ColumnType = ColumnFixedLengthObject; + + DataTypeFixedLengthObject() {} + + DataTypeFixedLengthObject(const DataTypeFixedLengthObject& other) {} + + const char* get_family_name() const override { return "DataTypeFixedLengthObject"; } + + TypeIndex get_type_id() const override { return TypeIndex::FixedLengthObject; } + + Field get_default() const override { return String(); } + + bool equals(const IDataType& rhs) const override { return typeid(rhs) == typeid(*this); } + + int64_t get_uncompressed_serialized_bytes(const IColumn& column) const override { + return static_cast<const ColumnType&>(column).byte_size() + sizeof(uint32_t) + + sizeof(size_t); + } + + char* serialize(const IColumn& column, char* buf) const override; + const char* deserialize(const char* buf, IColumn* column) const override; + + MutableColumnPtr create_column() const override; + + bool get_is_parametric() const override { return false; } + bool have_subtypes() const override { return false; } + + bool is_categorial() const override { return is_value_represented_by_integer(); } + bool can_be_inside_low_cardinality() const override { return false; } +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index d43ae88e8b..e31f673643 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -103,6 +103,9 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode, } _is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase; + _use_fixed_length_serialization_opt = + tnode.agg_node.__isset.use_fixed_length_serialization_opt && + tnode.agg_node.use_fixed_length_serialization_opt; } AggregationNode::~AggregationNode() = default; @@ -275,11 +278,14 @@ Status AggregationNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); _build_timer = ADD_TIMER(runtime_profile(), "BuildTime"); - _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTimer"); + _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime"); _exec_timer = ADD_TIMER(runtime_profile(), "ExecTime"); _merge_timer = ADD_TIMER(runtime_profile(), "MergeTime"); _expr_timer = ADD_TIMER(runtime_profile(), "ExprTime"); _get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime"); + _serialize_data_timer = ADD_TIMER(runtime_profile(), "SerializeDataTime"); + _deserialize_data_timer = ADD_TIMER(runtime_profile(), "DeserializeDataTime"); + _data_mem_tracker = std::make_unique<MemTracker>("AggregationNode:Data"); _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); @@ -565,20 +571,34 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block MutableColumns value_columns(agg_size); std::vector<DataTypePtr> data_types(agg_size); - // will serialize data to string column - std::vector<VectorBufferWriter> value_buffer_writers; - auto serialize_string_type = std::make_shared<DataTypeString>(); - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - data_types[i] = serialize_string_type; - value_columns[i] = serialize_string_type->create_column(); - value_buffer_writers.emplace_back(*reinterpret_cast<ColumnString*>(value_columns[i].get())); - } + if (_use_fixed_length_serialization_opt) { + auto serialize_string_type = std::make_shared<DataTypeString>(); + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + data_types[i] = _aggregate_evaluators[i]->function()->get_serialized_type(); + value_columns[i] = _aggregate_evaluators[i]->function()->create_serialize_column(); + } - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->function()->serialize( - _agg_data.without_key + _offsets_of_aggregate_states[i], value_buffer_writers[i]); - value_buffer_writers[i].commit(); + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->function()->serialize_without_key_to_column( + _agg_data.without_key + _offsets_of_aggregate_states[i], value_columns[i]); + } + } else { + std::vector<VectorBufferWriter> value_buffer_writers; + auto serialize_string_type = std::make_shared<DataTypeString>(); + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + data_types[i] = serialize_string_type; + value_columns[i] = serialize_string_type->create_column(); + value_buffer_writers.emplace_back( + *reinterpret_cast<ColumnString*>(value_columns[i].get())); + } + + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->function()->serialize( + _agg_data.without_key + _offsets_of_aggregate_states[i], + value_buffer_writers[i]); + value_buffer_writers[i].commit(); + } } { ColumnsWithTypeAndName data_with_schema; @@ -607,8 +627,6 @@ Status AggregationNode::_execute_without_key(Block* block) { Status AggregationNode::_merge_without_key(Block* block) { SCOPED_TIMER(_merge_timer); DCHECK(_agg_data.without_key != nullptr); - std::unique_ptr<char[]> deserialize_buffer(new char[_total_size_of_aggregate_states]); - int rows = block->rows(); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { if (_aggregate_evaluators[i]->is_merge()) { int col_id = _get_slot_column_id(_aggregate_evaluators[i]); @@ -617,12 +635,21 @@ Status AggregationNode::_merge_without_key(Block* block) { column = ((ColumnNullable*)column.get())->get_nested_column_ptr(); } - for (int j = 0; j < rows; ++j) { - VectorBufferReader buffer_reader(((ColumnString*)(column.get()))->get_data_at(j)); - - _aggregate_evaluators[i]->function()->deserialize_and_merge( - _agg_data.without_key + _offsets_of_aggregate_states[i], buffer_reader, + SCOPED_TIMER(_deserialize_data_timer); + if (_use_fixed_length_serialization_opt) { + _aggregate_evaluators[i]->function()->deserialize_and_merge_from_column( + _agg_data.without_key + _offsets_of_aggregate_states[i], *column, &_agg_arena_pool); + } else { + const int rows = block->rows(); + for (int j = 0; j < rows; ++j) { + VectorBufferReader buffer_reader( + ((ColumnString*)(column.get()))->get_data_at(j)); + + _aggregate_evaluators[i]->function()->deserialize_and_merge( + _agg_data.without_key + _offsets_of_aggregate_states[i], buffer_reader, + &_agg_arena_pool); + } } } else { _aggregate_evaluators[i]->execute_single_add( @@ -872,53 +899,59 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i // do not try to do agg, just init and serialize directly return the out_block if (!_should_expand_preagg_hash_tables()) { ret_flag = true; - if (_streaming_pre_places.size() < rows) { - _streaming_pre_places.reserve(rows); - for (size_t i = _streaming_pre_places.size(); i < rows; ++i) { - _streaming_pre_places.emplace_back(_agg_arena_pool.aligned_alloc( - _total_size_of_aggregate_states, _align_aggregate_states)); - } - } - - for (size_t i = 0; i < rows; ++i) { - _create_agg_status(_streaming_pre_places[i]); - } - - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->execute_batch_add( - in_block, _offsets_of_aggregate_states[i], - _streaming_pre_places.data(), &_agg_arena_pool, false); - } // will serialize value data to string column - std::vector<VectorBufferWriter> value_buffer_writers; bool mem_reuse = out_block->mem_reuse(); - auto serialize_string_type = std::make_shared<DataTypeString>(); + + std::vector<DataTypePtr> data_types; MutableColumns value_columns; - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - if (mem_reuse) { - value_columns.emplace_back( - std::move(*out_block->get_by_position(i + key_size).column) - .mutate()); - } else { - // slot type of value it should always be string type - value_columns.emplace_back(serialize_string_type->create_column()); + if (_use_fixed_length_serialization_opt) { + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + auto data_type = + _aggregate_evaluators[i]->function()->get_serialized_type(); + if (mem_reuse) { + value_columns.emplace_back( + std::move(*out_block->get_by_position(i + key_size) + .column) + .mutate()); + } else { + // slot type of value it should always be string type + value_columns.emplace_back(_aggregate_evaluators[i] + ->function() + ->create_serialize_column()); + } + data_types.emplace_back(data_type); } - value_buffer_writers.emplace_back( - *reinterpret_cast<ColumnString*>(value_columns[i].get())); - } - for (size_t j = 0; j < rows; ++j) { - for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->function()->serialize( - _streaming_pre_places[j] + _offsets_of_aggregate_states[i], - value_buffer_writers[i]); - value_buffer_writers[i].commit(); + for (int i = 0; i != _aggregate_evaluators.size(); ++i) { + SCOPED_TIMER(_serialize_data_timer); + _aggregate_evaluators[i]->streaming_agg_serialize_to_column( + in_block, value_columns[i], rows, &_agg_arena_pool); + } + } else { + std::vector<VectorBufferWriter> value_buffer_writers; + auto serialize_string_type = std::make_shared<DataTypeString>(); + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + if (mem_reuse) { + value_columns.emplace_back( + std::move(*out_block->get_by_position(i + key_size) + .column) + .mutate()); + } else { + // slot type of value it should always be string type + value_columns.emplace_back( + serialize_string_type->create_column()); + } + data_types.emplace_back(serialize_string_type); + value_buffer_writers.emplace_back( + *reinterpret_cast<ColumnString*>(value_columns[i].get())); } - } - for (size_t i = 0; i < rows; ++i) { - _destroy_agg_status(_streaming_pre_places[i]); + for (int i = 0; i != _aggregate_evaluators.size(); ++i) { + SCOPED_TIMER(_serialize_data_timer); + _aggregate_evaluators[i]->streaming_agg_serialize( + in_block, value_buffer_writers[i], rows, &_agg_arena_pool); + } } if (!mem_reuse) { @@ -931,7 +964,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i } for (int i = 0; i < value_columns.size(); ++i) { columns_with_schema.emplace_back(std::move(value_columns[i]), - serialize_string_type, ""); + data_types[i], ""); } out_block->swap(Block(columns_with_schema)); } else { @@ -1073,19 +1106,6 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat } } - // will serialize data to string column - std::vector<VectorBufferWriter> value_buffer_writers; - auto serialize_string_type = std::make_shared<DataTypeString>(); - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - value_data_types[i] = serialize_string_type; - if (mem_reuse) { - value_columns[i] = std::move(*block->get_by_position(i + key_size).column).mutate(); - } else { - value_columns[i] = serialize_string_type->create_column(); - } - value_buffer_writers.emplace_back(*reinterpret_cast<ColumnString*>(value_columns[i].get())); - } - std::visit( [&](auto&& agg_method) -> void { agg_method.init_once(); @@ -1095,7 +1115,7 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat const auto size = std::min(data.size(), size_t(state->batch_size())); using KeyType = std::decay_t<decltype(iter->get_first())>; std::vector<KeyType> keys(size); - std::vector<AggregateDataPtr> values(size); + std::vector<AggregateDataPtr> values(size + 1); size_t num_rows = 0; while (iter != data.end() && num_rows < state->batch_size()) { @@ -1107,31 +1127,60 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat agg_method.insert_keys_into_columns(keys, key_columns, num_rows, _probe_key_sz); - for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->function()->serialize_vec( - values, _offsets_of_aggregate_states[i], value_buffer_writers[i], - num_rows); - } - if (iter == data.end()) { if (agg_method.data.has_null_key_data()) { DCHECK(key_columns.size() == 1); DCHECK(key_columns[0]->is_nullable()); if (agg_method.data.has_null_key_data()) { key_columns[0]->insert_data(nullptr, 0); - auto mapped = agg_method.data.get_null_key_data(); - for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { - _aggregate_evaluators[i]->function()->serialize( - mapped + _offsets_of_aggregate_states[i], - value_buffer_writers[i]); - value_buffer_writers[i].commit(); - } + values[num_rows] = agg_method.data.get_null_key_data(); + ++num_rows; *eos = true; } } else { *eos = true; } } + + if (_use_fixed_length_serialization_opt) { + SCOPED_TIMER(_serialize_data_timer); + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + value_data_types[i] = + _aggregate_evaluators[i]->function()->get_serialized_type(); + if (mem_reuse) { + value_columns[i] = + std::move(*block->get_by_position(i + key_size).column) + .mutate(); + } else { + value_columns[i] = + _aggregate_evaluators[i]->function()->create_serialize_column(); + } + _aggregate_evaluators[i]->function()->serialize_to_column( + values, _offsets_of_aggregate_states[i], value_columns[i], + num_rows); + } + } else { + SCOPED_TIMER(_serialize_data_timer); + std::vector<VectorBufferWriter> value_buffer_writers; + auto serialize_string_type = std::make_shared<DataTypeString>(); + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + value_data_types[i] = serialize_string_type; + if (mem_reuse) { + value_columns[i] = + std::move(*block->get_by_position(i + key_size).column) + .mutate(); + } else { + value_columns[i] = serialize_string_type->create_column(); + } + value_buffer_writers.emplace_back( + *reinterpret_cast<ColumnString*>(value_columns[i].get())); + } + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->function()->serialize_vec( + values, _offsets_of_aggregate_states[i], value_buffer_writers[i], + num_rows); + } + } }, _agg_data._aggregated_method_variant); diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 6960c89c4f..687739dd07 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -649,6 +649,7 @@ private: bool _needs_finalize; bool _is_merge; bool _is_first_phase; + bool _use_fixed_length_serialization_opt; std::unique_ptr<MemPool> _mem_pool; std::unique_ptr<MemTracker> _data_mem_tracker; @@ -669,11 +670,12 @@ private: RuntimeProfile::Counter* _merge_timer; RuntimeProfile::Counter* _expr_timer; RuntimeProfile::Counter* _get_results_timer; + RuntimeProfile::Counter* _serialize_data_timer; + RuntimeProfile::Counter* _deserialize_data_timer; bool _is_streaming_preagg; Block _preagg_block = Block(); bool _should_expand_hash_table = true; - std::vector<char*> _streaming_pre_places; bool _should_limit_output = false; bool _reach_limit = false; @@ -802,13 +804,23 @@ private: std::unique_ptr<char[]> deserialize_buffer( new char[_aggregate_evaluators[i]->function()->size_of_data() * rows]); - _aggregate_evaluators[i]->function()->deserialize_vec( - deserialize_buffer.get(), (ColumnString*)(column.get()), - &_agg_arena_pool, rows); + if (_use_fixed_length_serialization_opt) { + SCOPED_TIMER(_deserialize_data_timer); + _aggregate_evaluators[i]->function()->deserialize_from_column( + deserialize_buffer.get(), *column, &_agg_arena_pool, rows); + } else { + SCOPED_TIMER(_deserialize_data_timer); + _aggregate_evaluators[i]->function()->deserialize_vec( + deserialize_buffer.get(), (ColumnString*)(column.get()), + &_agg_arena_pool, rows); + } _aggregate_evaluators[i]->function()->merge_vec_selected( places.data(), _offsets_of_aggregate_states[i], deserialize_buffer.get(), &_agg_arena_pool, rows); + _aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(), + rows); + } else { _aggregate_evaluators[i]->execute_batch_add_selected( block, _offsets_of_aggregate_states[i], places.data(), @@ -829,13 +841,23 @@ private: std::unique_ptr<char[]> deserialize_buffer( new char[_aggregate_evaluators[i]->function()->size_of_data() * rows]); - _aggregate_evaluators[i]->function()->deserialize_vec( - deserialize_buffer.get(), (ColumnString*)(column.get()), - &_agg_arena_pool, rows); + if (_use_fixed_length_serialization_opt) { + SCOPED_TIMER(_deserialize_data_timer); + _aggregate_evaluators[i]->function()->deserialize_from_column( + deserialize_buffer.get(), *column, &_agg_arena_pool, rows); + } else { + SCOPED_TIMER(_deserialize_data_timer); + _aggregate_evaluators[i]->function()->deserialize_vec( + deserialize_buffer.get(), (ColumnString*)(column.get()), + &_agg_arena_pool, rows); + } _aggregate_evaluators[i]->function()->merge_vec( places.data(), _offsets_of_aggregate_states[i], deserialize_buffer.get(), &_agg_arena_pool, rows); + _aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(), + rows); + } else { _aggregate_evaluators[i]->execute_batch_add(block, _offsets_of_aggregate_states[i], diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index 2631165777..28433f7b85 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -163,6 +163,20 @@ void AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset, _function->add_batch_selected(block->rows(), places, offset, _agg_columns.data(), arena); } +void AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf, + const size_t num_rows, Arena* arena) { + _calc_argment_columns(block); + SCOPED_TIMER(_exec_timer); + _function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows, arena); +} + +void AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) { + _calc_argment_columns(block); + SCOPED_TIMER(_exec_timer); + _function->streaming_agg_serialize_to_column(_agg_columns.data(), dst, num_rows, arena); +} + void AggFnEvaluator::insert_result_info(AggregateDataPtr place, IColumn* column) { _function->insert_result_into(place, *column); } diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h index 2185de7617..fa025edec1 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.h +++ b/be/src/vec/exprs/vectorized_agg_fn.h @@ -61,6 +61,12 @@ public: void execute_batch_add_selected(Block* block, size_t offset, AggregateDataPtr* places, Arena* arena = nullptr); + void streaming_agg_serialize(Block* block, BufferWritable& buf, const size_t num_rows, + Arena* arena); + + void streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena); + void insert_result_info(AggregateDataPtr place, IColumn* column); void insert_result_info_vec(const std::vector<AggregateDataPtr>& place, size_t offset, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index c8c89a6fa2..aca046c7a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -273,6 +273,7 @@ public class AggregationNode extends PlanNode { msg.agg_node.setAggSortInfos(aggSortInfos); msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg); msg.agg_node.setIsFirstPhase(aggInfo.isFirstPhase()); + msg.agg_node.setUseFixedLengthSerializationOpt(true); List<Expr> groupingExprs = aggInfo.getGroupingExprs(); if (groupingExprs != null) { msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs)); diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto index c5f47d2ef9..2cb44154cf 100644 --- a/gensrc/proto/types.proto +++ b/gensrc/proto/types.proto @@ -101,6 +101,7 @@ message PGenericType { NOTHING = 27; DATEV2 = 28; DATETIMEV2 = 29; + FIXEDLENGTHOBJECT = 30; UNKNOWN = 999; } required TypeId id = 2; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index bbe1e00c20..b214eeb496 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -561,7 +561,8 @@ struct TAggregationNode { 5: required bool need_finalize 6: optional bool use_streaming_preaggregation 7: optional list<TSortInfo> agg_sort_infos - 8: optional bool is_first_phase; + 8: optional bool is_first_phase + 9: optional bool use_fixed_length_serialization_opt } struct TRepeatNode { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org