This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new 818072389f [FIX](array)Fix update hash func array (#21630) 818072389f is described below commit 818072389f7c9b4344699bfc88a8babe663193c4 Author: amory <wangqian...@selectdb.com> AuthorDate: Sun Jul 9 22:03:58 2023 +0800 [FIX](array)Fix update hash func array (#21630) --- be/src/vec/columns/column.h | 16 ++++++--- be/src/vec/columns/column_array.cpp | 62 +++++++++++++++++++++++++++++++-- be/src/vec/columns/column_array.h | 14 +++++++- be/src/vec/columns/column_const.h | 12 +++++++ be/src/vec/columns/column_decimal.cpp | 32 ++++++++++------- be/src/vec/columns/column_decimal.h | 2 ++ be/src/vec/columns/column_nullable.cpp | 18 ++++++++++ be/src/vec/columns/column_nullable.h | 2 ++ be/src/vec/columns/column_string.h | 12 +++++++ be/src/vec/columns/column_vector.h | 19 ++++++++++ be/src/vec/sink/vdata_stream_sender.cpp | 3 ++ 11 files changed, 172 insertions(+), 20 deletions(-) diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 0a2c66183e..89dd4fbef4 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -339,7 +339,7 @@ public: /// On subsequent calls of this method for sequence of column values of arbitrary types, /// passed bytes to hash must identify sequence of values unambiguously. virtual void update_hash_with_value(size_t n, SipHash& hash) const { - LOG(FATAL) << "update_hash_with_value siphash not supported"; + LOG(FATAL) << get_name() << "update_hash_with_value siphash not supported"; } /// Update state of hash function with value of n elements to avoid the virtual function call @@ -348,7 +348,7 @@ public: /// do xxHash here, faster than other hash method virtual void update_hashes_with_value(std::vector<SipHash>& hashes, const uint8_t* __restrict null_data = nullptr) const { - LOG(FATAL) << "update_hashes_with_value siphash not supported"; + LOG(FATAL) << get_name() << "update_hashes_with_value siphash not supported"; }; /// Update state of hash function with value of n elements to avoid the virtual function call @@ -357,17 +357,25 @@ public: /// do xxHash here, faster than other sip hash virtual void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data = nullptr) const { - LOG(FATAL) << "update_hashes_with_value xxhash not supported"; + LOG(FATAL) << get_name() << "update_hashes_with_value xxhash not supported"; }; + virtual void update_xxHash_with_value(size_t n, uint64_t& hash) const { + LOG(FATAL) << get_name() << " update_hash_with_value xxhash not supported"; + } + /// Update state of crc32 hash function with value of n elements to avoid the virtual function call /// null_data to mark whether need to do hash compute, null_data == nullptr /// means all element need to do hash function, else only *null_data != 0 need to do hash func virtual void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, const uint8_t* __restrict null_data = nullptr) const { - LOG(FATAL) << "update_crcs_with_value not supported"; + LOG(FATAL) << get_name() << "update_crcs_with_value not supported"; }; + virtual void update_crc_with_value(size_t n, uint64_t& hash) const { + LOG(FATAL) << get_name() << " update_crc_with_value not supported"; + } + /** Removes elements that don't match the filter. * Is used in WHERE and HAVING operations. * If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column; diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 4ae7ada9f7..999991fde5 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -223,11 +223,67 @@ const char* ColumnArray::deserialize_and_insert_from_arena(const char* pos) { return pos; } -void ColumnArray::update_hash_with_value(size_t n, SipHash& hash) const { - size_t array_size = size_at(n); +void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const { + SIP_HASHES_FUNCTION_COLUMN_IMPL(); +} + +// for every array row calculate xxHash +void ColumnArray::update_xxHash_with_value(size_t n, uint64_t& hash) const { + size_t elem_size = size_at(n); + size_t offset = offset_at(n); + hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&elem_size), sizeof(elem_size), + hash); + for (auto i = 0; i < elem_size; ++i) { + get_data().update_xxHash_with_value(offset + i, hash); + } +} + +// for every array row calculate crcHash +void ColumnArray::update_crc_with_value(size_t n, uint64_t& crc) const { + size_t elem_size = size_at(n); size_t offset = offset_at(n); - for (size_t i = 0; i < array_size; ++i) get_data().update_hash_with_value(offset + i, hash); + crc = HashUtil::zlib_crc_hash(reinterpret_cast<const char*>(&elem_size), sizeof(elem_size), + crc); + for (auto i = 0; i < elem_size; ++i) { + get_data().update_crc_with_value(offset + i, crc); + } +} + +void ColumnArray::update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data) const { + auto s = size(); + if (null_data) { + for (size_t i = 0; i < s; ++i) { + if (null_data[i] == 0) { + update_xxHash_with_value(i, hashes[i]); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_xxHash_with_value(i, hashes[i]); + } + } +} + +void ColumnArray::update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, + const uint8_t* __restrict null_data) const { + auto s = hash.size(); + DCHECK(s == size()); + + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_crc_with_value(i, hash[i]); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_crc_with_value(i, hash[i]); + } + } } void ColumnArray::insert(const Field& x) { diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 824cca8b23..7239de3347 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -97,7 +97,17 @@ public: void insert_data(const char* pos, size_t length) override; StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override; const char* deserialize_and_insert_from_arena(const char* pos) override; - void update_hash_with_value(size_t n, SipHash& hash) const override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override; + void update_crc_with_value(size_t n, uint64_t& crc) const override; + + void update_hashes_with_value(std::vector<SipHash>& hashes, + const uint8_t* __restrict null_data) const override; + + void update_hashes_with_value(uint64_t* __restrict hashes, + const uint8_t* __restrict null_data = nullptr) const override; + + void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type, + const uint8_t* __restrict null_data = nullptr) const override; void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert(const Field& x) override; void insert_from(const IColumn& src_, size_t n) override; @@ -179,6 +189,8 @@ public: Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* col_ptr) override; private: + // [[2,1,5,9,1], [1,2,4]] --> data column [2,1,5,9,1,1,2,4], offset[-1] = 0, offset[0] = 5, offset[1] = 8 + // [[[2,1,5],[9,1]], [[1,2]]] --> data column [3 column array], offset[-1] = 0, offset[0] = 2, offset[1] = 3 WrappedPtr data; WrappedPtr offsets; diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 9637a0943f..13640a0a9a 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -135,6 +135,18 @@ public: void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override { + auto real_data = data->get_data_at(0); + if (real_data.data == nullptr) { + hash = HashUtil::xxHash64NullWithSeed(hash); + } else { + hash = HashUtil::xxHash64WithSeed(real_data.data, real_data.size, hash); + } + } + + void update_crc_with_value(size_t n, uint64_t& crc) const override { + get_data_column_ptr()->update_crc_with_value(n, crc); + } ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; ColumnPtr replicate(const Offsets& offsets) const override; diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 13902b54a7..78e7d055cb 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -127,36 +127,44 @@ void ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes, SIP_HASHES_FUNCTION_COLUMN_IMPL(); } +template <typename T> +void ColumnDecimal<T>::update_crc_with_value(size_t n, uint64_t& crc) const { + if constexpr (!IsDecimalV2<T>) { + crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc); + } else { + const DecimalV2Value& dec_val = (const DecimalV2Value&)data[n]; + int64_t int_val = dec_val.int_value(); + int32_t frac_val = dec_val.frac_value(); + crc = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), crc); + crc = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), crc); + }; +} + template <typename T> void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const { auto s = hashes.size(); DCHECK(s == size()); - if constexpr (!IsDecimalV2<T>) { DO_CRC_HASHES_FUNCTION_COLUMN_IMPL() } else { - DCHECK(type == TYPE_DECIMALV2); - auto decimalv2_do_crc = [&](size_t i) { - const DecimalV2Value& dec_val = (const DecimalV2Value&)data[i]; - int64_t int_val = dec_val.int_value(); - int32_t frac_val = dec_val.frac_value(); - hashes[i] = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), hashes[i]); - hashes[i] = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), hashes[i]); - }; - if (null_data == nullptr) { for (size_t i = 0; i < s; i++) { - decimalv2_do_crc(i); + update_crc_with_value(i, hashes[i]); } } else { for (size_t i = 0; i < s; i++) { - if (null_data[i] == 0) decimalv2_do_crc(i); + if (null_data[i] == 0) update_crc_with_value(i, hashes[i]); } } } } +template <typename T> +void ColumnDecimal<T>::update_xxHash_with_value(size_t n, uint64_t& hash) const { + hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[n]), sizeof(T), hash); +} + template <typename T> void ColumnDecimal<T>::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index a3960067cb..8968ff6017 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -159,6 +159,8 @@ public: const uint8_t* __restrict null_data) const override; void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override; + void update_crc_with_value(size_t n, uint64_t& crc) const override; int compare_at(size_t n, size_t m, const IColumn& rhs_, int nan_direction_hint) const override; void get_permutation(bool reverse, size_t limit, int nan_direction_hint, diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 5b087550cc..0e90251343 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -114,6 +114,24 @@ void ColumnNullable::update_hashes_with_value(uint64_t* __restrict hashes, } } +void ColumnNullable::update_xxHash_with_value(size_t n, uint64_t& hash) const { + auto* __restrict real_null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data().data(); + if (real_null_data[n] != 0) { + hash = HashUtil::xxHash64NullWithSeed(hash); + } else { + nested_column->update_xxHash_with_value(n, hash); + } +} + +void ColumnNullable::update_crc_with_value(size_t n, uint64_t& crc) const { + auto* __restrict real_null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data().data(); + if (real_null_data[n] != 0) { + crc = HashUtil::zlib_crc_hash_null(crc); + } else { + nested_column->update_xxHash_with_value(n, crc); + } +} + MutableColumnPtr ColumnNullable::clone_resized(size_t new_size) const { MutableColumnPtr new_nested_col = get_nested_column().clone_resized(new_size); auto new_null_map = ColumnUInt8::create(); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index fe34724954..9f29965a32 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -186,6 +186,8 @@ public: const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override; + void update_crc_with_value(size_t n, uint64_t& crc) const override; void get_extremes(Field& min, Field& max) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 373e676bf1..b4caa7b985 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -342,6 +342,18 @@ public: SIP_HASHES_FUNCTION_COLUMN_IMPL(); } + void update_xxHash_with_value(size_t n, uint64_t& hash) const override { + size_t string_size = size_at(n); + size_t offset = offset_at(n); + hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&chars[offset]), + string_size, hash); + } + + void update_crc_with_value(size_t n, uint64_t& crc) const override { + auto data_ref = get_data_at(n); + crc = HashUtil::zlib_crc_hash(data_ref.data, data_ref.size, crc); + } + void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type, const uint8_t* __restrict null_data) const override; diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index c944827fbc..1c18e810b0 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -243,6 +243,25 @@ public: const uint8_t* null_map, size_t max_row_byte_size) const override; + void update_xxHash_with_value(size_t n, uint64_t& hash) const override { + hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[n]), sizeof(T), hash); + } + + void update_crc_with_value(size_t n, uint64_t& crc) const override { + if constexpr (!std::is_same_v<T, Int64>) { + crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc); + } else { + if (this->is_date_type() || this->is_datetime_type()) { + char buf[64]; + const VecDateTimeValue& date_val = (const VecDateTimeValue&)data[n]; + auto len = date_val.to_buffer(buf); + crc = HashUtil::zlib_crc_hash(buf, len, crc); + } else { + crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc); + } + } + } + void update_hash_with_value(size_t n, SipHash& hash) const override; void update_hashes_with_value(std::vector<SipHash>& hashes, diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 9768f61244..d596d78e20 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -569,6 +569,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { std::vector<SipHash> siphashs(rows); // result[j] means column index, i means rows index for (int j = 0; j < result_size; ++j) { + // complex type most not implement get_data_at() method which column_const will call block->get_by_position(result[j]).column->update_hashes_with_value(siphashs); } for (int i = 0; i < rows; i++) { @@ -578,6 +579,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { SCOPED_TIMER(_split_block_hash_compute_timer); // result[j] means column index, i means rows index, here to calculate the xxhash value for (int j = 0; j < result_size; ++j) { + // complex type most not implement get_data_at() method which column_const will call block->get_by_position(result[j]).column->update_hashes_with_value(hashes); } @@ -590,6 +592,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { RETURN_IF_ERROR(channel_add_rows(_channels, element_size, hashes, rows, block)); } else { for (int j = 0; j < result_size; ++j) { + // complex type most not implement get_data_at() method which column_const will call block->get_by_position(result[j]).column->update_crcs_with_value( hash_vals, _partition_expr_ctxs[j]->root()->type().type); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org