yixiutt commented on code in PR #12573: URL: https://github.com/apache/doris/pull/12573#discussion_r970839880
########## be/src/vec/core/block.cpp: ########## @@ -689,15 +690,16 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee return Status::OK(); } -Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes, +/*Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes, Review Comment: done ########## be/src/util/block_compression.cpp: ########## @@ -28,47 +28,92 @@ #include <limits> #include "gutil/strings/substitute.h" +#include "util/defer_op.h" #include "util/faststring.h" namespace doris { using strings::Substitute; -Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, Slice* output) const { +Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, size_t uncompressed_size, + faststring* output) { faststring buf; // we compute total size to avoid more memory copy - size_t total_size = Slice::compute_total_size(inputs); - buf.reserve(total_size); + buf.reserve(uncompressed_size); for (auto& input : inputs) { buf.append(input.data, input.size); } return compress(buf, output); } +bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) { + if (uncompressed_size > std::numeric_limits<int32_t>::max()) { + return true; + } + return false; +} + class Lz4BlockCompression : public BlockCompressionCodec { +private: + struct Context { + Context() : ctx(nullptr) {} + LZ4_stream_t* ctx; + faststring buffer; + }; + public: - static const Lz4BlockCompression* instance() { + static Lz4BlockCompression* instance() { static Lz4BlockCompression s_instance; return &s_instance; } - ~Lz4BlockCompression() override {} + ~Lz4BlockCompression() { + for (auto ctx : _ctx_pool) { + _delete_compression_ctx(ctx); + } + } - Status compress(const Slice& input, Slice* output) const override { - if (input.size > std::numeric_limits<int32_t>::max() || - output->size > std::numeric_limits<int32_t>::max()) { - return Status::InvalidArgument("LZ4 cannot handle data large than 2G"); + Status compress(const Slice& input, faststring* output) override { + Context* context; + RETURN_IF_ERROR(_acquire_compression_ctx(&context)); + bool compress_failed = false; + Defer defer {[&] { + if (compress_failed) { + _delete_compression_ctx(context); + } else { + _release_compression_ctx(context); + } + }}; + Slice compressed_buf; + size_t max_len = max_compressed_len(input.size); + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast<char*>(output->data()); + compressed_buf.size = max_len; + } else { + // reuse context buffer if max_len < MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer.resize(max_len); + compressed_buf.data = reinterpret_cast<char*>(context->buffer.data()); + compressed_buf.size = max_len; } - auto compressed_len = - LZ4_compress_default(input.data, output->data, input.size, output->size); + + int32_t acc = 1; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org