wangbo commented on code in PR #11257:
URL: https://github.com/apache/doris/pull/11257#discussion_r932931508


##########
be/src/vec/aggregate_functions/aggregate_function_uniq.h:
##########
@@ -109,18 +102,96 @@ class AggregateFunctionUniq final
         detail::OneAdder<T, Data>::add(this->data(place), *columns[0], 
row_num);
     }
 
+    static ALWAYS_INLINE const KeyType* get_keys(std::vector<KeyType>& 
keys_container,
+                                                 const IColumn& column, size_t 
batch_size) {
+        if constexpr (std::is_same_v<T, String>) {
+            keys_container.resize(batch_size);
+            for (size_t i = 0; i != batch_size; ++i) {
+                StringRef value = column.get_data_at(i);
+                keys_container[i] = Data::get_key(value);
+            }
+            return keys_container.data();
+        } else {
+            using ColumnType =
+                    std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, 
ColumnVector<T>>;
+            return assert_cast<const ColumnType&>(column).get_data().data();
+        }
+    }
+
+    void add_batch(size_t batch_size, AggregateDataPtr* places, size_t 
place_offset,
+                   const IColumn** columns, Arena* arena) const override {
+        std::vector<KeyType> keys_container;
+        const KeyType* keys = get_keys(keys_container, *columns[0], 
batch_size);
+
+        std::vector<typename Data::Set*> array_of_data_set(batch_size);
+
+        for (size_t i = 0; i != batch_size; ++i) {
+            array_of_data_set[i] = &(this->data(places[i] + place_offset).set);
+        }
+
+        for (size_t i = 0; i != batch_size; ++i) {
+            if (i + HASH_MAP_PREFETCH_DIST < batch_size) {
+                array_of_data_set[i + HASH_MAP_PREFETCH_DIST]->prefetch(
+                        keys[i + HASH_MAP_PREFETCH_DIST]);
+            }
+
+            array_of_data_set[i]->insert(keys[i]);
+        }
+    }
+
     void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
                Arena*) const override {
-        this->data(place).set.merge(this->data(rhs).set);
+        auto& rhs_set = this->data(rhs).set;
+        if (rhs_set.size() == 0) return;
+
+        auto& set = this->data(place).set;
+        set.rehash(set.size() + rhs_set.size());
+
+        for (auto elem : rhs_set) {
+            set.insert(elem);
+        }
+    }
+
+    void add_batch_single_place(size_t batch_size, AggregateDataPtr place, 
const IColumn** columns,
+                                Arena* arena) const override {
+        std::vector<KeyType> keys_container;
+        const KeyType* keys = get_keys(keys_container, *columns[0], 
batch_size);
+        auto& set = this->data(place).set;
+
+        for (size_t i = 0; i != batch_size; ++i) {
+            if (i + HASH_MAP_PREFETCH_DIST < batch_size) {
+                set.prefetch(keys[i + HASH_MAP_PREFETCH_DIST]);
+            }
+            set.insert(keys[i]);
+        }
     }
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
-        this->data(place).set.write(buf);
+        auto& set = this->data(place).set;
+        write_var_uint(set.size(), buf);
+        for (const auto& elem : set) {
+            write_pod_binary(elem, buf);

Review Comment:
   How about add a Todo here; After phmap is included in BE's code. We can  
serialize phmap in copy way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to