This is an automated email from the ASF dual-hosted git repository. sollhui pushed a commit to branch fix-apache-master-configurable-block-size-lz4-log in repository https://gitbox.apache.org/repos/asf/doris.git
commit 1ef6f11f0ee51338970c5e282504ab2cbb501ac5 Author: laihui <[email protected]> AuthorDate: Thu Jun 25 11:16:16 2026 +0800 [improvement](be) Add block size config for load debugging ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: The preferred block byte budget was capped by a hard-coded 512MB value in BE, which made it inconvenient to reproduce and diagnose oversized block behavior from configuration. This change adds a BE config for the maximum preferred block byte budget and uses it in RuntimeState::preferred_block_size_bytes(). It also logs oversized string serialization and LZ4 compression/decompression failures in DataTypeString so large-block reproductions can quickly identify whether t [...] ### Release note Add BE-side diagnostics and a configurable preferred block byte cap for large-block debugging. ### Check List (For Author) - Test: Manual test - git diff --check - python3 build-support/run_clang_format.py --clang-format-executable /opt/homebrew/bin/clang-format --style file --inplace false be/src/common/config.cpp be/src/common/config.h be/src/runtime/runtime_state.h be/src/core/data_type/data_type_string.cpp - Behavior changed: Yes. The maximum preferred block byte budget can now be adjusted by BE config, default remains 512MB. - Does this need documentation: No --- be/src/common/config.cpp | 3 +++ be/src/common/config.h | 3 +++ be/src/core/data_type/data_type_string.cpp | 39 ++++++++++++++++++++++++++---- be/src/runtime/runtime_state.h | 22 +++++++++-------- 4 files changed, 52 insertions(+), 15 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 86b7d12e0aa..add465f3b90 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -436,6 +436,9 @@ DEFINE_mBool(enable_low_cardinality_optimize, "true"); DEFINE_Bool(enable_low_cardinality_cache_code, "true"); DEFINE_mBool(enable_adaptive_batch_size, "true"); +// Maximum byte budget returned by RuntimeState::preferred_block_size_bytes. +// Default is 512MB. Increase only for debugging large-block behavior. +DEFINE_mInt64(max_preferred_block_size_bytes, "536870912"); // be policy // whether check compaction checksum diff --git a/be/src/common/config.h b/be/src/common/config.h index 0b415ed5d2c..1e3de57c762 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -505,6 +505,9 @@ DECLARE_Bool(enable_low_cardinality_cache_code); // so that each output block stays close to preferred_block_size_bytes. // When false, the fixed batch_size row behaviour is preserved. DECLARE_mBool(enable_adaptive_batch_size); +// Maximum byte budget returned by RuntimeState::preferred_block_size_bytes. +// Default is 512MB. Increase only for debugging large-block behavior. +DECLARE_mInt64(max_preferred_block_size_bytes); // be policy // whether check compaction checksum diff --git a/be/src/core/data_type/data_type_string.cpp b/be/src/core/data_type/data_type_string.cpp index 2fa6d3e99d8..a6756d91f49 100644 --- a/be/src/core/data_type/data_type_string.cpp +++ b/be/src/core/data_type/data_type_string.cpp @@ -20,6 +20,7 @@ #include "core/data_type/data_type_string.h" +#include <glog/logging.h> #include <lz4/lz4.h> #include <streamvbyte.h> @@ -29,6 +30,7 @@ #include "agent/be_exec_version_manager.h" #include "common/cast_set.h" +#include "common/compiler_util.h" // IWYU pragma: keep #include "common/exception.h" #include "common/status.h" #include "core/assert_cast.h" @@ -87,6 +89,10 @@ int64_t DataTypeString::get_uncompressed_serialized_bytes(const IColumn& column, size += bytes; } else { if (bytes > LZ4_MAX_INPUT_SIZE) { + LOG(WARNING) << "DataTypeString serialized byte size exceeds LZ4 max input size, " + << "bytes=" << bytes << ", LZ4_MAX_INPUT_SIZE=" << LZ4_MAX_INPUT_SIZE + << ", rows=" << data_column.size() + << ", real_need_copy_num=" << real_need_copy_num; throw Exception(ErrorCode::BUFFER_OVERFLOW, "LZ4_compressBound meet invalid input size, input_size={}, " "LZ4_MAX_INPUT_SIZE={}", @@ -127,9 +133,25 @@ char* DataTypeString::serialize(const IColumn& column, char* buf, int be_exec_ve memcpy(buf, string_column.get_chars().data(), value_len); buf += value_len; } else { - auto encode_size = LZ4_compress_fast(string_column.get_chars().raw_data(), - (buf + sizeof(size_t)), cast_set<Int32>(value_len), - LZ4_compressBound(cast_set<Int32>(value_len)), 1); + if (UNLIKELY(value_len > LZ4_MAX_INPUT_SIZE)) { + LOG(WARNING) << "DataTypeString serialize value length exceeds LZ4 max input size, " + << "value_len=" << value_len + << ", LZ4_MAX_INPUT_SIZE=" << LZ4_MAX_INPUT_SIZE + << ", rows=" << string_column.size() + << ", chars_size=" << string_column.get_chars().size(); + } + const auto lz4_value_len = cast_set<Int32>(value_len); + const auto lz4_compress_bound = LZ4_compressBound(lz4_value_len); + auto encode_size = + LZ4_compress_fast(string_column.get_chars().raw_data(), (buf + sizeof(size_t)), + lz4_value_len, lz4_compress_bound, 1); + if (UNLIKELY(encode_size <= 0)) { + LOG(WARNING) << "DataTypeString LZ4_compress_fast failed, value_len=" << value_len + << ", lz4_value_len=" << lz4_value_len + << ", lz4_compress_bound=" << lz4_compress_bound + << ", rows=" << string_column.size() + << ", chars_size=" << string_column.get_chars().size(); + } unaligned_store<size_t>(buf, encode_size); buf += (sizeof(size_t) + encode_size); } @@ -172,8 +194,15 @@ const char* DataTypeString::deserialize(const char* buf, MutableColumnPtr* colum } else { size_t encode_size = unaligned_load<size_t>(buf); buf += sizeof(size_t); - LZ4_decompress_safe(buf, reinterpret_cast<char*>(data.data()), cast_set<Int32>(encode_size), - cast_set<Int32>(value_len)); + const auto lz4_encode_size = cast_set<Int32>(encode_size); + const auto lz4_value_len = cast_set<Int32>(value_len); + const auto decoded_size = LZ4_decompress_safe(buf, reinterpret_cast<char*>(data.data()), + lz4_encode_size, lz4_value_len); + if (UNLIKELY(decoded_size < 0 || decoded_size != lz4_value_len)) { + LOG(WARNING) << "DataTypeString LZ4_decompress_safe failed, encode_size=" << encode_size + << ", value_len=" << value_len << ", decoded_size=" << decoded_size + << ", rows=" << real_have_saved_num; + } buf += encode_size; } return buf; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index a01a57d3f7c..07bc8fbc4e7 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -148,23 +148,25 @@ public: } // Target byte budget per output block (default 8MB when adaptive is enabled). - // The public FE/session contract is [1MB, 512MB]; this accessor still clamps any direct - // thrift or mixed-version out-of-range value into that range. Returns `kMax` when adaptive - // is disabled by BE config so the value is always a legal byte budget; callers that need - // to know whether adaptive batch size is active should test - // `config::enable_adaptive_batch_size` explicitly. + // The public FE/session contract is [1MB, max_preferred_block_size_bytes]; this accessor still + // clamps any direct thrift or mixed-version out-of-range value into that range. Returns + // max_preferred_block_size_bytes when adaptive is disabled by BE config so the value is always + // a legal byte budget; callers that need to know whether adaptive batch size is active should + // test `config::enable_adaptive_batch_size` explicitly. MOCK_FUNCTION size_t preferred_block_size_bytes() const { static constexpr int64_t kDefault = 8388608L; // 8MB - static constexpr int64_t kMax = 536870912L; // 512MB static constexpr int64_t kMin = 1048576L; // 1MB + const int64_t max_preferred_block_size_bytes = + std::max<int64_t>(kMin, config::max_preferred_block_size_bytes); if (!config::enable_adaptive_batch_size) [[unlikely]] { - return kMax; + return max_preferred_block_size_bytes; } if (_query_options.__isset.preferred_block_size_bytes) [[likely]] { - return std::max<int64_t>( - kMin, std::min<int64_t>(_query_options.preferred_block_size_bytes, kMax)); + return std::max<int64_t>(kMin, + std::min<int64_t>(_query_options.preferred_block_size_bytes, + max_preferred_block_size_bytes)); } - return kDefault; + return std::min<int64_t>(kDefault, max_preferred_block_size_bytes); } int query_parallel_instance_num() const { return _query_options.parallel_instance; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
