This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c22d097b59 [improvement](compress) Support compress/decompress block with lz4 (#11955) c22d097b59 is described below commit c22d097b590870969175ed6face43c5e976ca2a7 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Mon Aug 22 17:35:43 2022 +0800 [improvement](compress) Support compress/decompress block with lz4 (#11955) --- be/src/runtime/runtime_state.h | 9 +++++ .../aggregate_functions/aggregate_function_sort.h | 3 +- be/src/vec/core/block.cpp | 45 +++++++++++++++------ be/src/vec/core/block.h | 10 +++++ be/src/vec/runtime/vdata_stream_recvr.cpp | 4 ++ be/src/vec/runtime/vdata_stream_recvr.h | 2 + be/src/vec/sink/vdata_stream_sender.cpp | 8 +++- be/src/vec/sink/vdata_stream_sender.h | 3 ++ be/src/vec/sink/vtablet_sink.cpp | 1 + be/test/vec/core/block_test.cpp | 46 +++++++++++++--------- .../java/org/apache/doris/qe/SessionVariable.java | 10 +++++ gensrc/proto/data.proto | 3 ++ gensrc/thrift/PaloInternalService.thrift | 2 + 13 files changed, 113 insertions(+), 33 deletions(-) diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index f12f41ddee..033f19ae3f 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -351,6 +351,15 @@ public: return _query_options.enable_enable_exchange_node_parallel_merge; } + segment_v2::CompressionTypePB fragement_transmission_compression_type() { + if (_query_options.__isset.fragment_transmission_compression_codec) { + if (_query_options.fragment_transmission_compression_codec == "lz4") { + return segment_v2::CompressionTypePB::LZ4; + } + } + return segment_v2::CompressionTypePB::SNAPPY; + } + // the following getters are only valid after Prepare() InitialReservations* initial_reservations() const { return _initial_reservations; } diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h b/be/src/vec/aggregate_functions/aggregate_function_sort.h index ae320a6df9..2db72a4c5c 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sort.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h @@ -61,7 +61,8 @@ struct AggregateFunctionSortData { PBlock pblock; size_t uncompressed_bytes = 0; size_t compressed_bytes = 0; - block.serialize(&pblock, &uncompressed_bytes, &compressed_bytes); + block.serialize(&pblock, &uncompressed_bytes, &compressed_bytes, + segment_v2::CompressionTypePB::SNAPPY); write_string_binary(pblock.SerializeAsString(), buf); } diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 9aab2c311b..8728803a4d 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -34,6 +34,7 @@ #include "runtime/tuple.h" #include "runtime/tuple_row.h" #include "udf/udf.h" +#include "util/block_compression.h" #include "vec/columns/column.h" #include "vec/columns/column_const.h" #include "vec/columns/column_nullable.h" @@ -79,16 +80,28 @@ Block::Block(const PBlock& pblock) { std::string compression_scratch; if (pblock.compressed()) { // Decompress + SCOPED_RAW_TIMER(&_decompress_time_ns); const char* compressed_data = pblock.column_values().c_str(); size_t compressed_size = pblock.column_values().size(); size_t uncompressed_size = 0; - bool success = - snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size); - DCHECK(success) << "snappy::GetUncompressedLength failed"; - compression_scratch.resize(uncompressed_size); - success = - snappy::RawUncompress(compressed_data, compressed_size, compression_scratch.data()); - DCHECK(success) << "snappy::RawUncompress failed"; + if (pblock.has_compression_type() && pblock.has_uncompressed_size()) { + std::unique_ptr<BlockCompressionCodec> codec; + get_block_compression_codec(pblock.compression_type(), codec); + uncompressed_size = pblock.uncompressed_size(); + compression_scratch.resize(uncompressed_size); + Slice decompressed_slice(compression_scratch); + codec->decompress(Slice(compressed_data, compressed_size), &decompressed_slice); + DCHECK(uncompressed_size == decompressed_slice.size); + } else { + bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, + &uncompressed_size); + DCHECK(success) << "snappy::GetUncompressedLength failed"; + compression_scratch.resize(uncompressed_size); + success = snappy::RawUncompress(compressed_data, compressed_size, + compression_scratch.data()); + DCHECK(success) << "snappy::RawUncompress failed"; + } + _decompressed_bytes = uncompressed_size; buf = compression_scratch.data(); } else { buf = pblock.column_values().data(); @@ -684,6 +697,7 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee } Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes, + segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data) const { // calc uncompressed size for allocation size_t content_uncompressed_size = 0; @@ -717,7 +731,14 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp // compress if (config::compress_rowbatches && content_uncompressed_size > 0) { - size_t max_compressed_size = snappy::MaxCompressedLength(content_uncompressed_size); + SCOPED_RAW_TIMER(const_cast<int64_t*>(&_compress_time_ns)); + pblock->set_compression_type(compression_type); + pblock->set_uncompressed_size(content_uncompressed_size); + + std::unique_ptr<BlockCompressionCodec> codec; + RETURN_IF_ERROR(get_block_compression_codec(compression_type, codec)); + + size_t max_compressed_size = codec->max_compressed_len(content_uncompressed_size); std::string compression_scratch; try { // Try compressing the content to compression_scratch, @@ -732,10 +753,10 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp LOG(WARNING) << msg; return Status::BufferAllocFailed(msg); } - size_t compressed_size = 0; - char* compressed_output = compression_scratch.data(); - snappy::RawCompress(column_values->data(), content_uncompressed_size, compressed_output, - &compressed_size); + + Slice compressed_slice(compression_scratch); + codec->compress(Slice(column_values->data(), content_uncompressed_size), &compressed_slice); + size_t compressed_size = compressed_slice.size; if (LIKELY(compressed_size < content_uncompressed_size)) { compression_scratch.resize(compressed_size); diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index ac4cd95841..b4fd18bb0c 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -65,6 +65,11 @@ private: Container data; IndexByName index_by_name; + int64_t _decompress_time_ns = 0; + int64_t _decompressed_bytes = 0; + + int64_t _compress_time_ns = 0; + public: BlockInfo info; @@ -262,6 +267,7 @@ public: // serialize block to PBlock Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes, + segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data = false) const; // serialize block to PRowbatch @@ -335,6 +341,10 @@ public: void shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx); + int64_t get_decompress_time() const { return _decompress_time_ns; } + int64_t get_decompressed_bytes() const { return _decompressed_bytes; } + int64_t get_compress_time() const { return _compress_time_ns; } + private: void erase_impl(size_t position); void initialize_index_by_name(); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 2fb7be3223..02d450ce1b 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -125,6 +125,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe { SCOPED_TIMER(_recvr->_deserialize_row_batch_timer); block = new Block(pblock); + COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time()); + COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); } VLOG_ROW << "added #rows=" << block->rows() << " batch_size=" << block_byte_size << "\n"; @@ -284,6 +286,8 @@ VDataStreamRecvr::VDataStreamRecvr( _data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime"); _buffer_full_total_timer = ADD_TIMER(_profile, "SendersBlockedTotalTimer(*)"); _first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime"); + _decompress_timer = ADD_TIMER(_profile, "DecompressTime"); + _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES); } VDataStreamRecvr::~VDataStreamRecvr() { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index bedd18bbce..7372285125 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -128,6 +128,8 @@ private: RuntimeProfile::Counter* _first_batch_wait_total_timer; RuntimeProfile::Counter* _buffer_full_total_timer; RuntimeProfile::Counter* _data_arrival_timer; + RuntimeProfile::Counter* _decompress_timer; + RuntimeProfile::Counter* _decompress_bytes; std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr; }; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index e5b6a5c721..00fd63c692 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -330,6 +330,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD _cur_pb_block(&_pb_block1), _profile(nullptr), _serialize_batch_timer(nullptr), + _compress_timer(nullptr), _bytes_sent_counter(nullptr), _local_bytes_send_counter(nullptr), _dest_node_id(0) { @@ -347,6 +348,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_ _cur_pb_block(&_pb_block1), _profile(nullptr), _serialize_batch_timer(nullptr), + _compress_timer(nullptr), _bytes_sent_counter(nullptr), _local_bytes_send_counter(nullptr), _dest_node_id(0) { @@ -425,6 +427,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) { _uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); _ignore_rows = ADD_COUNTER(profile(), "IgnoreRows", TUnit::UNIT); _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime"); + _compress_timer = ADD_TIMER(profile(), "CompressTime"); _overall_throughput = profile()->add_derived_counter( "OverallThroughput", TUnit::BYTES_PER_SECOND, std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter, @@ -445,6 +448,8 @@ Status VDataStreamSender::open(RuntimeState* state) { for (auto iter : _partition_infos) { RETURN_IF_ERROR(iter->open(state)); } + + _compression_type = state->fragement_transmission_compression_type(); return Status::OK(); } @@ -597,9 +602,10 @@ Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int num_rece dest->Clear(); size_t uncompressed_bytes = 0, compressed_bytes = 0; RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes, - _transfer_large_data_by_brpc)); + _compression_type, _transfer_large_data_by_brpc)); COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers); COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); + COUNTER_UPDATE(_compress_timer, src->get_compress_time()); } return Status::OK(); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 3d572366e1..1c8bcf4f87 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -131,6 +131,7 @@ protected: RuntimeProfile* _profile; // Allocated from _pool RuntimeProfile::Counter* _serialize_batch_timer; + RuntimeProfile::Counter* _compress_timer; RuntimeProfile::Counter* _bytes_sent_counter; RuntimeProfile::Counter* _uncompressed_bytes_counter; RuntimeProfile::Counter* _ignore_rows; @@ -146,6 +147,8 @@ protected: // User can change this config at runtime, avoid it being modified during query or loading process. bool _transfer_large_data_by_brpc = false; + + segment_v2::CompressionTypePB _compression_type; }; // TODO: support local exechange diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 3cbc73348c..a9d091bf75 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -258,6 +258,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) { SCOPED_ATOMIC_TIMER(&_serialize_batch_ns); size_t uncompressed_bytes = 0, compressed_bytes = 0; Status st = block.serialize(request.mutable_block(), &uncompressed_bytes, &compressed_bytes, + state->fragement_transmission_compression_type(), _parent->_transfer_large_data_by_brpc); if (!st.ok()) { cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index a18db4e1e5..a3a5be3b26 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -180,10 +180,12 @@ TEST(BlockTest, RowBatchCovertToBlock) { } } -void block_to_pb(const vectorized::Block& block, PBlock* pblock) { +void block_to_pb( + const vectorized::Block& block, PBlock* pblock, + segment_v2::CompressionTypePB compression_type = segment_v2::CompressionTypePB::SNAPPY) { size_t uncompressed_bytes = 0; size_t compressed_bytes = 0; - Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes); + Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes, compression_type); EXPECT_TRUE(st.ok()); EXPECT_TRUE(uncompressed_bytes >= compressed_bytes); EXPECT_EQ(compressed_bytes, pblock->column_values().size()); @@ -237,7 +239,7 @@ void fill_block_with_array_string(vectorized::Block& block) { block.insert(test_array_string); } -TEST(BlockTest, SerializeAndDeserializeBlock) { +void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_type) { config::compress_rowbatches = true; // int { @@ -250,12 +252,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) { vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, "test_int"); vectorized::Block block({type_and_name}); PBlock pblock; - block_to_pb(block, &pblock); + block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); vectorized::Block block2(pblock); PBlock pblock2; - block_to_pb(block2, &pblock2); + block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); EXPECT_EQ(s1, s2); } @@ -271,12 +273,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) { "test_string"); vectorized::Block block({type_and_name}); PBlock pblock; - block_to_pb(block, &pblock); + block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); vectorized::Block block2(pblock); PBlock pblock2; - block_to_pb(block2, &pblock2); + block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); EXPECT_EQ(s1, s2); } @@ -295,12 +297,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) { decimal_data_type, "test_decimal"); vectorized::Block block({type_and_name}); PBlock pblock; - block_to_pb(block, &pblock); + block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); vectorized::Block block2(pblock); PBlock pblock2; - block_to_pb(block2, &pblock2); + block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); EXPECT_EQ(s1, s2); } @@ -321,12 +323,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) { "test_bitmap"); vectorized::Block block({type_and_name}); PBlock pblock; - block_to_pb(block, &pblock); + block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); vectorized::Block block2(pblock); PBlock pblock2; - block_to_pb(block2, &pblock2); + block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); EXPECT_EQ(s1, s2); } @@ -341,12 +343,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) { nullable_data_type, "test_nullable"); vectorized::Block block({type_and_name}); PBlock pblock; - block_to_pb(block, &pblock); + block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); vectorized::Block block2(pblock); PBlock pblock2; - block_to_pb(block2, &pblock2); + block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); EXPECT_EQ(s1, s2); } @@ -361,14 +363,14 @@ TEST(BlockTest, SerializeAndDeserializeBlock) { nullable_column->get_ptr(), nullable_data_type, "test_nullable_decimal"); vectorized::Block block({type_and_name}); PBlock pblock; - block_to_pb(block, &pblock); + block_to_pb(block, &pblock, compression_type); EXPECT_EQ(1, pblock.column_metas_size()); EXPECT_TRUE(pblock.column_metas()[0].has_decimal_param()); std::string s1 = pblock.DebugString(); vectorized::Block block2(pblock); PBlock pblock2; - block_to_pb(block2, &pblock2); + block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); EXPECT_EQ(s1, s2); } @@ -385,12 +387,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) { data_type, "test_nullable_int32"); vectorized::Block block({type_and_name}); PBlock pblock; - block_to_pb(block, &pblock); + block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); vectorized::Block block2(pblock); PBlock pblock2; - block_to_pb(block2, &pblock2); + block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); EXPECT_EQ(s1, s2); } @@ -400,17 +402,23 @@ TEST(BlockTest, SerializeAndDeserializeBlock) { fill_block_with_array_int(block); fill_block_with_array_string(block); PBlock pblock; - block_to_pb(block, &pblock); + block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); vectorized::Block block2(pblock); PBlock pblock2; - block_to_pb(block2, &pblock2); + block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); EXPECT_EQ(s1, s2); } } +TEST(BlockTest, SerializeAndDeserializeBlock) { + config::compress_rowbatches = true; + serialize_and_deserialize_test(segment_v2::CompressionTypePB::SNAPPY); + serialize_and_deserialize_test(segment_v2::CompressionTypePB::LZ4); +} + TEST(BlockTest, dump_data) { auto vec = vectorized::ColumnVector<Int32>::create(); auto& int32_data = vec->get_data(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index ccc7a8764d..70be8869d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -207,6 +207,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_FUNCTION_PUSHDOWN = "enable_function_pushdown"; + public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC = "fragment_transmission_compression_codec"; + // session origin value public Map<Field, String> sessionOriginValue = new HashMap<Field, String>(); // check stmt is or not [select /*+ SET_VAR(...)*/ ...] @@ -352,6 +354,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = PREFER_JOIN_METHOD) public String preferJoinMethod = "broadcast"; + @VariableMgr.VarAttr(name = FRAGMENT_TRANSMISSION_COMPRESSION_CODEC) + public String fragmentTransmissionCompressionCodec = "lz4"; + /* * the parallel exec instance num for one Fragment in one BE * 1 means disable this feature @@ -1060,6 +1065,10 @@ public class SessionVariable implements Serializable, Writable { this.enableRemoveNoConjunctsRuntimeFilterPolicy = enableRemoveNoConjunctsRuntimeFilterPolicy; } + public void setFragmentTransmissionCompressionCodec(String codec) { + this.fragmentTransmissionCompressionCodec = codec; + } + // Serialize to thrift object // used for rest api public TQueryOptions toThrift() { @@ -1103,6 +1112,7 @@ public class SessionVariable implements Serializable, Writable { } tResult.setEnableFunctionPushdown(enableFunctionPushdown); + tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec); return tResult; } diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index b1c52731fb..f066dc4a5b 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -21,6 +21,7 @@ package doris; option java_package = "org.apache.doris.proto"; import "types.proto"; +import "segment_v2.proto"; message PNodeStatistics { required int64 node_id = 1; @@ -63,4 +64,6 @@ message PBlock { repeated PColumnMeta column_metas = 1; optional bytes column_values = 2; optional bool compressed = 3 [default = false]; + optional int64 uncompressed_size = 4; + optional segment_v2.CompressionTypePB compression_type = 5 [default = SNAPPY]; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 910f700310..92d41abbd7 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -165,6 +165,8 @@ struct TQueryOptions { 44: optional bool trim_tailing_spaces_for_external_table_query = false 45: optional bool enable_function_pushdown; + + 46: optional string fragment_transmission_compression_codec; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org