BiteTheDDDDt commented on code in PR #11257: URL: https://github.com/apache/doris/pull/11257#discussion_r930881724
########## be/src/vec/aggregate_functions/aggregate_function_uniq.h: ########## @@ -111,16 +110,82 @@ class AggregateFunctionUniq final void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena*) const override { - this->data(place).set.merge(this->data(rhs).set); + auto& rhs_set = this->data(rhs).set; + if (rhs_set.size() == 0) return; + + auto& set = this->data(place).set; + set.rehash(set.size() + rhs_set.size()); + + for (auto elem : rhs_set) { + set.insert(elem); + } + } + + void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns, + Arena* arena) const override { + auto& column = *columns[0]; + std::vector<KeyType> keys(batch_size); + for (size_t i = 0; i != batch_size; ++i) { + if constexpr (std::is_same_v<T, String>) { + StringRef value = column.get_data_at(i); + + UInt128 key; + SipHash hash; + hash.update(value.data, value.size); + hash.get128(key.low, key.high); + + keys[i] = key; + } else if constexpr (std::is_same_v<T, Decimal128>) { + keys[i] = assert_cast<const ColumnDecimal<Decimal128>&>(column).get_data()[i]; + } else { + keys[i] = assert_cast<const ColumnVector<T>&>(column).get_data()[i]; + } + } + + auto& set = this->data(place).set; + for (size_t i = 0; i != batch_size; ++i) { + if (i + 16 < batch_size) { + set.prefetch(keys[i + 16]); + } + set.insert(keys[i]); + } } void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - this->data(place).set.write(buf); + auto& set = this->data(place).set; + write_var_uint(set.size(), buf); + for (const auto& elem : set) { + write_pod_binary(elem, buf); + } + } + + void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf, + Arena* arena) const override { + auto& set = this->data(place).set; + size_t size; + read_var_uint(size, buf); + + set.rehash(size + set.size()); + + for (size_t i = 0; i < size; ++i) { + KeyType ref; + read_pod_binary(ref, buf); + set.insert(ref); + } } void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, Review Comment: Seems only diffrent with `deserialize_and_merge` is set.rehash, maybe we can do some abstract here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org