This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 818072389f [FIX](array)Fix update hash func array (#21630)
818072389f is described below

commit 818072389f7c9b4344699bfc88a8babe663193c4
Author: amory <wangqian...@selectdb.com>
AuthorDate: Sun Jul 9 22:03:58 2023 +0800

    [FIX](array)Fix update hash func array (#21630)
---
 be/src/vec/columns/column.h             | 16 ++++++---
 be/src/vec/columns/column_array.cpp     | 62 +++++++++++++++++++++++++++++++--
 be/src/vec/columns/column_array.h       | 14 +++++++-
 be/src/vec/columns/column_const.h       | 12 +++++++
 be/src/vec/columns/column_decimal.cpp   | 32 ++++++++++-------
 be/src/vec/columns/column_decimal.h     |  2 ++
 be/src/vec/columns/column_nullable.cpp  | 18 ++++++++++
 be/src/vec/columns/column_nullable.h    |  2 ++
 be/src/vec/columns/column_string.h      | 12 +++++++
 be/src/vec/columns/column_vector.h      | 19 ++++++++++
 be/src/vec/sink/vdata_stream_sender.cpp |  3 ++
 11 files changed, 172 insertions(+), 20 deletions(-)

diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 0a2c66183e..89dd4fbef4 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -339,7 +339,7 @@ public:
     /// On subsequent calls of this method for sequence of column values of 
arbitrary types,
     ///  passed bytes to hash must identify sequence of values unambiguously.
     virtual void update_hash_with_value(size_t n, SipHash& hash) const {
-        LOG(FATAL) << "update_hash_with_value siphash not supported";
+        LOG(FATAL) << get_name() << "update_hash_with_value siphash not 
supported";
     }
 
     /// Update state of hash function with value of n elements to avoid the 
virtual function call
@@ -348,7 +348,7 @@ public:
     /// do xxHash here, faster than other hash method
     virtual void update_hashes_with_value(std::vector<SipHash>& hashes,
                                           const uint8_t* __restrict null_data 
= nullptr) const {
-        LOG(FATAL) << "update_hashes_with_value siphash not supported";
+        LOG(FATAL) << get_name() << "update_hashes_with_value siphash not 
supported";
     };
 
     /// Update state of hash function with value of n elements to avoid the 
virtual function call
@@ -357,17 +357,25 @@ public:
     /// do xxHash here, faster than other sip hash
     virtual void update_hashes_with_value(uint64_t* __restrict hashes,
                                           const uint8_t* __restrict null_data 
= nullptr) const {
-        LOG(FATAL) << "update_hashes_with_value xxhash not supported";
+        LOG(FATAL) << get_name() << "update_hashes_with_value xxhash not 
supported";
     };
 
+    virtual void update_xxHash_with_value(size_t n, uint64_t& hash) const {
+        LOG(FATAL) << get_name() << " update_hash_with_value xxhash not 
supported";
+    }
+
     /// Update state of crc32 hash function with value of n elements to avoid 
the virtual function call
     /// null_data to mark whether need to do hash compute, null_data == nullptr
     /// means all element need to do hash function, else only *null_data != 0 
need to do hash func
     virtual void update_crcs_with_value(std::vector<uint64_t>& hash, 
PrimitiveType type,
                                         const uint8_t* __restrict null_data = 
nullptr) const {
-        LOG(FATAL) << "update_crcs_with_value not supported";
+        LOG(FATAL) << get_name() << "update_crcs_with_value not supported";
     };
 
+    virtual void update_crc_with_value(size_t n, uint64_t& hash) const {
+        LOG(FATAL) << get_name() << " update_crc_with_value not supported";
+    }
+
     /** Removes elements that don't match the filter.
       * Is used in WHERE and HAVING operations.
       * If result_size_hint > 0, then makes advance reserve(result_size_hint) 
for the result column;
diff --git a/be/src/vec/columns/column_array.cpp 
b/be/src/vec/columns/column_array.cpp
index 4ae7ada9f7..999991fde5 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -223,11 +223,67 @@ const char* 
ColumnArray::deserialize_and_insert_from_arena(const char* pos) {
     return pos;
 }
 
-void ColumnArray::update_hash_with_value(size_t n, SipHash& hash) const {
-    size_t array_size = size_at(n);
+void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes,
+                                           const uint8_t* __restrict 
null_data) const {
+    SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
+// for every array row calculate xxHash
+void ColumnArray::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+    size_t elem_size = size_at(n);
+    size_t offset = offset_at(n);
+    hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const 
char*>(&elem_size), sizeof(elem_size),
+                                      hash);
+    for (auto i = 0; i < elem_size; ++i) {
+        get_data().update_xxHash_with_value(offset + i, hash);
+    }
+}
+
+// for every array row calculate crcHash
+void ColumnArray::update_crc_with_value(size_t n, uint64_t& crc) const {
+    size_t elem_size = size_at(n);
     size_t offset = offset_at(n);
 
-    for (size_t i = 0; i < array_size; ++i) 
get_data().update_hash_with_value(offset + i, hash);
+    crc = HashUtil::zlib_crc_hash(reinterpret_cast<const char*>(&elem_size), 
sizeof(elem_size),
+                                  crc);
+    for (auto i = 0; i < elem_size; ++i) {
+        get_data().update_crc_with_value(offset + i, crc);
+    }
+}
+
+void ColumnArray::update_hashes_with_value(uint64_t* __restrict hashes,
+                                           const uint8_t* __restrict 
null_data) const {
+    auto s = size();
+    if (null_data) {
+        for (size_t i = 0; i < s; ++i) {
+            if (null_data[i] == 0) {
+                update_xxHash_with_value(i, hashes[i]);
+            }
+        }
+    } else {
+        for (size_t i = 0; i < s; ++i) {
+            update_xxHash_with_value(i, hashes[i]);
+        }
+    }
+}
+
+void ColumnArray::update_crcs_with_value(std::vector<uint64_t>& hash, 
PrimitiveType type,
+                                         const uint8_t* __restrict null_data) 
const {
+    auto s = hash.size();
+    DCHECK(s == size());
+
+    if (null_data) {
+        for (size_t i = 0; i < s; ++i) {
+            // every row
+            if (null_data[i] == 0) {
+                update_crc_with_value(i, hash[i]);
+            }
+        }
+    } else {
+        for (size_t i = 0; i < s; ++i) {
+            update_crc_with_value(i, hash[i]);
+        }
+    }
 }
 
 void ColumnArray::insert(const Field& x) {
diff --git a/be/src/vec/columns/column_array.h 
b/be/src/vec/columns/column_array.h
index 824cca8b23..7239de3347 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -97,7 +97,17 @@ public:
     void insert_data(const char* pos, size_t length) override;
     StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& 
begin) const override;
     const char* deserialize_and_insert_from_arena(const char* pos) override;
-    void update_hash_with_value(size_t n, SipHash& hash) const override;
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+    void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
+    void update_hashes_with_value(std::vector<SipHash>& hashes,
+                                  const uint8_t* __restrict null_data) const 
override;
+
+    void update_hashes_with_value(uint64_t* __restrict hashes,
+                                  const uint8_t* __restrict null_data = 
nullptr) const override;
+
+    void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType 
type,
+                                const uint8_t* __restrict null_data = nullptr) 
const override;
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
     void insert(const Field& x) override;
     void insert_from(const IColumn& src_, size_t n) override;
@@ -179,6 +189,8 @@ public:
     Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* 
col_ptr) override;
 
 private:
+    // [[2,1,5,9,1], [1,2,4]] --> data column [2,1,5,9,1,1,2,4], offset[-1] = 
0, offset[0] = 5, offset[1] = 8
+    // [[[2,1,5],[9,1]], [[1,2]]] --> data column [3 column array], offset[-1] 
= 0, offset[0] = 2, offset[1] = 3
     WrappedPtr data;
     WrappedPtr offsets;
 
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index 9637a0943f..13640a0a9a 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -135,6 +135,18 @@ public:
 
     void update_hashes_with_value(uint64_t* __restrict hashes,
                                   const uint8_t* __restrict null_data) const 
override;
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+        auto real_data = data->get_data_at(0);
+        if (real_data.data == nullptr) {
+            hash = HashUtil::xxHash64NullWithSeed(hash);
+        } else {
+            hash = HashUtil::xxHash64WithSeed(real_data.data, real_data.size, 
hash);
+        }
+    }
+
+    void update_crc_with_value(size_t n, uint64_t& crc) const override {
+        get_data_column_ptr()->update_crc_with_value(n, crc);
+    }
 
     ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const 
override;
     ColumnPtr replicate(const Offsets& offsets) const override;
diff --git a/be/src/vec/columns/column_decimal.cpp 
b/be/src/vec/columns/column_decimal.cpp
index 13902b54a7..78e7d055cb 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -127,36 +127,44 @@ void 
ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
     SIP_HASHES_FUNCTION_COLUMN_IMPL();
 }
 
+template <typename T>
+void ColumnDecimal<T>::update_crc_with_value(size_t n, uint64_t& crc) const {
+    if constexpr (!IsDecimalV2<T>) {
+        crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+    } else {
+        const DecimalV2Value& dec_val = (const DecimalV2Value&)data[n];
+        int64_t int_val = dec_val.int_value();
+        int32_t frac_val = dec_val.frac_value();
+        crc = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), crc);
+        crc = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), crc);
+    };
+}
+
 template <typename T>
 void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, 
PrimitiveType type,
                                               const uint8_t* __restrict 
null_data) const {
     auto s = hashes.size();
     DCHECK(s == size());
-
     if constexpr (!IsDecimalV2<T>) {
         DO_CRC_HASHES_FUNCTION_COLUMN_IMPL()
     } else {
-        DCHECK(type == TYPE_DECIMALV2);
-        auto decimalv2_do_crc = [&](size_t i) {
-            const DecimalV2Value& dec_val = (const DecimalV2Value&)data[i];
-            int64_t int_val = dec_val.int_value();
-            int32_t frac_val = dec_val.frac_value();
-            hashes[i] = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), 
hashes[i]);
-            hashes[i] = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), 
hashes[i]);
-        };
-
         if (null_data == nullptr) {
             for (size_t i = 0; i < s; i++) {
-                decimalv2_do_crc(i);
+                update_crc_with_value(i, hashes[i]);
             }
         } else {
             for (size_t i = 0; i < s; i++) {
-                if (null_data[i] == 0) decimalv2_do_crc(i);
+                if (null_data[i] == 0) update_crc_with_value(i, hashes[i]);
             }
         }
     }
 }
 
+template <typename T>
+void ColumnDecimal<T>::update_xxHash_with_value(size_t n, uint64_t& hash) 
const {
+    hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[n]), 
sizeof(T), hash);
+}
+
 template <typename T>
 void ColumnDecimal<T>::update_hashes_with_value(uint64_t* __restrict hashes,
                                                 const uint8_t* __restrict 
null_data) const {
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index a3960067cb..8968ff6017 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -159,6 +159,8 @@ public:
                                   const uint8_t* __restrict null_data) const 
override;
     void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType 
type,
                                 const uint8_t* __restrict null_data) const 
override;
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+    void update_crc_with_value(size_t n, uint64_t& crc) const override;
 
     int compare_at(size_t n, size_t m, const IColumn& rhs_, int 
nan_direction_hint) const override;
     void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index 5b087550cc..0e90251343 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -114,6 +114,24 @@ void ColumnNullable::update_hashes_with_value(uint64_t* 
__restrict hashes,
     }
 }
 
+void ColumnNullable::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+    auto* __restrict real_null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data().data();
+    if (real_null_data[n] != 0) {
+        hash = HashUtil::xxHash64NullWithSeed(hash);
+    } else {
+        nested_column->update_xxHash_with_value(n, hash);
+    }
+}
+
+void ColumnNullable::update_crc_with_value(size_t n, uint64_t& crc) const {
+    auto* __restrict real_null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data().data();
+    if (real_null_data[n] != 0) {
+        crc = HashUtil::zlib_crc_hash_null(crc);
+    } else {
+        nested_column->update_xxHash_with_value(n, crc);
+    }
+}
+
 MutableColumnPtr ColumnNullable::clone_resized(size_t new_size) const {
     MutableColumnPtr new_nested_col = 
get_nested_column().clone_resized(new_size);
     auto new_null_map = ColumnUInt8::create();
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index fe34724954..9f29965a32 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -186,6 +186,8 @@ public:
                                 const uint8_t* __restrict null_data) const 
override;
     void update_hashes_with_value(uint64_t* __restrict hashes,
                                   const uint8_t* __restrict null_data) const 
override;
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+    void update_crc_with_value(size_t n, uint64_t& crc) const override;
     void get_extremes(Field& min, Field& max) const override;
 
     MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) 
const override {
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 373e676bf1..b4caa7b985 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -342,6 +342,18 @@ public:
         SIP_HASHES_FUNCTION_COLUMN_IMPL();
     }
 
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+        size_t string_size = size_at(n);
+        size_t offset = offset_at(n);
+        hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const 
char*>(&chars[offset]),
+                                          string_size, hash);
+    }
+
+    void update_crc_with_value(size_t n, uint64_t& crc) const override {
+        auto data_ref = get_data_at(n);
+        crc = HashUtil::zlib_crc_hash(data_ref.data, data_ref.size, crc);
+    }
+
     void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType 
type,
                                 const uint8_t* __restrict null_data) const 
override;
 
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index c944827fbc..1c18e810b0 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -243,6 +243,25 @@ public:
                                      const uint8_t* null_map,
                                      size_t max_row_byte_size) const override;
 
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+        hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const 
char*>(&data[n]), sizeof(T), hash);
+    }
+
+    void update_crc_with_value(size_t n, uint64_t& crc) const override {
+        if constexpr (!std::is_same_v<T, Int64>) {
+            crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+        } else {
+            if (this->is_date_type() || this->is_datetime_type()) {
+                char buf[64];
+                const VecDateTimeValue& date_val = (const 
VecDateTimeValue&)data[n];
+                auto len = date_val.to_buffer(buf);
+                crc = HashUtil::zlib_crc_hash(buf, len, crc);
+            } else {
+                crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+            }
+        }
+    }
+
     void update_hash_with_value(size_t n, SipHash& hash) const override;
 
     void update_hashes_with_value(std::vector<SipHash>& hashes,
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 9768f61244..d596d78e20 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -569,6 +569,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block) {
                 std::vector<SipHash> siphashs(rows);
                 // result[j] means column index, i means rows index
                 for (int j = 0; j < result_size; ++j) {
+                    // complex type most not implement get_data_at() method 
which column_const will call
                     
block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
                 }
                 for (int i = 0; i < rows; i++) {
@@ -578,6 +579,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block) {
                 SCOPED_TIMER(_split_block_hash_compute_timer);
                 // result[j] means column index, i means rows index, here to 
calculate the xxhash value
                 for (int j = 0; j < result_size; ++j) {
+                    // complex type most not implement get_data_at() method 
which column_const will call
                     
block->get_by_position(result[j]).column->update_hashes_with_value(hashes);
                 }
 
@@ -590,6 +592,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block) {
             RETURN_IF_ERROR(channel_add_rows(_channels, element_size, hashes, 
rows, block));
         } else {
             for (int j = 0; j < result_size; ++j) {
+                // complex type most not implement get_data_at() method which 
column_const will call
                 
block->get_by_position(result[j]).column->update_crcs_with_value(
                         hash_vals, 
_partition_expr_ctxs[j]->root()->type().type);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to