wangbo commented on code in PR #11257: URL: https://github.com/apache/doris/pull/11257#discussion_r932931508
########## be/src/vec/aggregate_functions/aggregate_function_uniq.h: ########## @@ -109,18 +102,96 @@ class AggregateFunctionUniq final detail::OneAdder<T, Data>::add(this->data(place), *columns[0], row_num); } + static ALWAYS_INLINE const KeyType* get_keys(std::vector<KeyType>& keys_container, + const IColumn& column, size_t batch_size) { + if constexpr (std::is_same_v<T, String>) { + keys_container.resize(batch_size); + for (size_t i = 0; i != batch_size; ++i) { + StringRef value = column.get_data_at(i); + keys_container[i] = Data::get_key(value); + } + return keys_container.data(); + } else { + using ColumnType = + std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>; + return assert_cast<const ColumnType&>(column).get_data().data(); + } + } + + void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset, + const IColumn** columns, Arena* arena) const override { + std::vector<KeyType> keys_container; + const KeyType* keys = get_keys(keys_container, *columns[0], batch_size); + + std::vector<typename Data::Set*> array_of_data_set(batch_size); + + for (size_t i = 0; i != batch_size; ++i) { + array_of_data_set[i] = &(this->data(places[i] + place_offset).set); + } + + for (size_t i = 0; i != batch_size; ++i) { + if (i + HASH_MAP_PREFETCH_DIST < batch_size) { + array_of_data_set[i + HASH_MAP_PREFETCH_DIST]->prefetch( + keys[i + HASH_MAP_PREFETCH_DIST]); + } + + array_of_data_set[i]->insert(keys[i]); + } + } + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena*) const override { - this->data(place).set.merge(this->data(rhs).set); + auto& rhs_set = this->data(rhs).set; + if (rhs_set.size() == 0) return; + + auto& set = this->data(place).set; + set.rehash(set.size() + rhs_set.size()); + + for (auto elem : rhs_set) { + set.insert(elem); + } + } + + void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns, + Arena* arena) const override { + std::vector<KeyType> keys_container; + const KeyType* keys = get_keys(keys_container, *columns[0], batch_size); + auto& set = this->data(place).set; + + for (size_t i = 0; i != batch_size; ++i) { + if (i + HASH_MAP_PREFETCH_DIST < batch_size) { + set.prefetch(keys[i + HASH_MAP_PREFETCH_DIST]); + } + set.insert(keys[i]); + } } void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - this->data(place).set.write(buf); + auto& set = this->data(place).set; + write_var_uint(set.size(), buf); + for (const auto& elem : set) { + write_pod_binary(elem, buf); Review Comment: How about add a Todo here; After phmap is included in BE's code. We can serialize phmap in copy way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org