This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c22d097b59 [improvement](compress) Support compress/decompress block 
with lz4 (#11955)
c22d097b59 is described below

commit c22d097b590870969175ed6face43c5e976ca2a7
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Mon Aug 22 17:35:43 2022 +0800

    [improvement](compress) Support compress/decompress block with lz4 (#11955)
---
 be/src/runtime/runtime_state.h                     |  9 +++++
 .../aggregate_functions/aggregate_function_sort.h  |  3 +-
 be/src/vec/core/block.cpp                          | 45 +++++++++++++++------
 be/src/vec/core/block.h                            | 10 +++++
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  4 ++
 be/src/vec/runtime/vdata_stream_recvr.h            |  2 +
 be/src/vec/sink/vdata_stream_sender.cpp            |  8 +++-
 be/src/vec/sink/vdata_stream_sender.h              |  3 ++
 be/src/vec/sink/vtablet_sink.cpp                   |  1 +
 be/test/vec/core/block_test.cpp                    | 46 +++++++++++++---------
 .../java/org/apache/doris/qe/SessionVariable.java  | 10 +++++
 gensrc/proto/data.proto                            |  3 ++
 gensrc/thrift/PaloInternalService.thrift           |  2 +
 13 files changed, 113 insertions(+), 33 deletions(-)

diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f12f41ddee..033f19ae3f 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -351,6 +351,15 @@ public:
         return _query_options.enable_enable_exchange_node_parallel_merge;
     }
 
+    segment_v2::CompressionTypePB fragement_transmission_compression_type() {
+        if (_query_options.__isset.fragment_transmission_compression_codec) {
+            if (_query_options.fragment_transmission_compression_codec == 
"lz4") {
+                return segment_v2::CompressionTypePB::LZ4;
+            }
+        }
+        return segment_v2::CompressionTypePB::SNAPPY;
+    }
+
     // the following getters are only valid after Prepare()
     InitialReservations* initial_reservations() const { return 
_initial_reservations; }
 
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h 
b/be/src/vec/aggregate_functions/aggregate_function_sort.h
index ae320a6df9..2db72a4c5c 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sort.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h
@@ -61,7 +61,8 @@ struct AggregateFunctionSortData {
         PBlock pblock;
         size_t uncompressed_bytes = 0;
         size_t compressed_bytes = 0;
-        block.serialize(&pblock, &uncompressed_bytes, &compressed_bytes);
+        block.serialize(&pblock, &uncompressed_bytes, &compressed_bytes,
+                        segment_v2::CompressionTypePB::SNAPPY);
 
         write_string_binary(pblock.SerializeAsString(), buf);
     }
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 9aab2c311b..8728803a4d 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -34,6 +34,7 @@
 #include "runtime/tuple.h"
 #include "runtime/tuple_row.h"
 #include "udf/udf.h"
+#include "util/block_compression.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_const.h"
 #include "vec/columns/column_nullable.h"
@@ -79,16 +80,28 @@ Block::Block(const PBlock& pblock) {
     std::string compression_scratch;
     if (pblock.compressed()) {
         // Decompress
+        SCOPED_RAW_TIMER(&_decompress_time_ns);
         const char* compressed_data = pblock.column_values().c_str();
         size_t compressed_size = pblock.column_values().size();
         size_t uncompressed_size = 0;
-        bool success =
-                snappy::GetUncompressedLength(compressed_data, 
compressed_size, &uncompressed_size);
-        DCHECK(success) << "snappy::GetUncompressedLength failed";
-        compression_scratch.resize(uncompressed_size);
-        success =
-                snappy::RawUncompress(compressed_data, compressed_size, 
compression_scratch.data());
-        DCHECK(success) << "snappy::RawUncompress failed";
+        if (pblock.has_compression_type() && pblock.has_uncompressed_size()) {
+            std::unique_ptr<BlockCompressionCodec> codec;
+            get_block_compression_codec(pblock.compression_type(), codec);
+            uncompressed_size = pblock.uncompressed_size();
+            compression_scratch.resize(uncompressed_size);
+            Slice decompressed_slice(compression_scratch);
+            codec->decompress(Slice(compressed_data, compressed_size), 
&decompressed_slice);
+            DCHECK(uncompressed_size == decompressed_slice.size);
+        } else {
+            bool success = snappy::GetUncompressedLength(compressed_data, 
compressed_size,
+                                                         &uncompressed_size);
+            DCHECK(success) << "snappy::GetUncompressedLength failed";
+            compression_scratch.resize(uncompressed_size);
+            success = snappy::RawUncompress(compressed_data, compressed_size,
+                                            compression_scratch.data());
+            DCHECK(success) << "snappy::RawUncompress failed";
+        }
+        _decompressed_bytes = uncompressed_size;
         buf = compression_scratch.data();
     } else {
         buf = pblock.column_values().data();
@@ -684,6 +697,7 @@ Status Block::filter_block(Block* block, int 
filter_column_id, int column_to_kee
 }
 
 Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* 
compressed_bytes,
+                        segment_v2::CompressionTypePB compression_type,
                         bool allow_transfer_large_data) const {
     // calc uncompressed size for allocation
     size_t content_uncompressed_size = 0;
@@ -717,7 +731,14 @@ Status Block::serialize(PBlock* pblock, size_t* 
uncompressed_bytes, size_t* comp
 
     // compress
     if (config::compress_rowbatches && content_uncompressed_size > 0) {
-        size_t max_compressed_size = 
snappy::MaxCompressedLength(content_uncompressed_size);
+        SCOPED_RAW_TIMER(const_cast<int64_t*>(&_compress_time_ns));
+        pblock->set_compression_type(compression_type);
+        pblock->set_uncompressed_size(content_uncompressed_size);
+
+        std::unique_ptr<BlockCompressionCodec> codec;
+        RETURN_IF_ERROR(get_block_compression_codec(compression_type, codec));
+
+        size_t max_compressed_size = 
codec->max_compressed_len(content_uncompressed_size);
         std::string compression_scratch;
         try {
             // Try compressing the content to compression_scratch,
@@ -732,10 +753,10 @@ Status Block::serialize(PBlock* pblock, size_t* 
uncompressed_bytes, size_t* comp
             LOG(WARNING) << msg;
             return Status::BufferAllocFailed(msg);
         }
-        size_t compressed_size = 0;
-        char* compressed_output = compression_scratch.data();
-        snappy::RawCompress(column_values->data(), content_uncompressed_size, 
compressed_output,
-                            &compressed_size);
+
+        Slice compressed_slice(compression_scratch);
+        codec->compress(Slice(column_values->data(), 
content_uncompressed_size), &compressed_slice);
+        size_t compressed_size = compressed_slice.size;
 
         if (LIKELY(compressed_size < content_uncompressed_size)) {
             compression_scratch.resize(compressed_size);
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index ac4cd95841..b4fd18bb0c 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -65,6 +65,11 @@ private:
     Container data;
     IndexByName index_by_name;
 
+    int64_t _decompress_time_ns = 0;
+    int64_t _decompressed_bytes = 0;
+
+    int64_t _compress_time_ns = 0;
+
 public:
     BlockInfo info;
 
@@ -262,6 +267,7 @@ public:
 
     // serialize block to PBlock
     Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* 
compressed_bytes,
+                     segment_v2::CompressionTypePB compression_type,
                      bool allow_transfer_large_data = false) const;
 
     // serialize block to PRowbatch
@@ -335,6 +341,10 @@ public:
 
     void shrink_char_type_column_suffix_zero(const std::vector<size_t>& 
char_type_idx);
 
+    int64_t get_decompress_time() const { return _decompress_time_ns; }
+    int64_t get_decompressed_bytes() const { return _decompressed_bytes; }
+    int64_t get_compress_time() const { return _compress_time_ns; }
+
 private:
     void erase_impl(size_t position);
     void initialize_index_by_name();
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 2fb7be3223..02d450ce1b 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -125,6 +125,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
     {
         SCOPED_TIMER(_recvr->_deserialize_row_batch_timer);
         block = new Block(pblock);
+        COUNTER_UPDATE(_recvr->_decompress_timer, 
block->get_decompress_time());
+        COUNTER_UPDATE(_recvr->_decompress_bytes, 
block->get_decompressed_bytes());
     }
 
     VLOG_ROW << "added #rows=" << block->rows() << " batch_size=" << 
block_byte_size << "\n";
@@ -284,6 +286,8 @@ VDataStreamRecvr::VDataStreamRecvr(
     _data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime");
     _buffer_full_total_timer = ADD_TIMER(_profile, 
"SendersBlockedTotalTimer(*)");
     _first_batch_wait_total_timer = ADD_TIMER(_profile, 
"FirstBatchArrivalWaitTime");
+    _decompress_timer = ADD_TIMER(_profile, "DecompressTime");
+    _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
 }
 
 VDataStreamRecvr::~VDataStreamRecvr() {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index bedd18bbce..7372285125 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -128,6 +128,8 @@ private:
     RuntimeProfile::Counter* _first_batch_wait_total_timer;
     RuntimeProfile::Counter* _buffer_full_total_timer;
     RuntimeProfile::Counter* _data_arrival_timer;
+    RuntimeProfile::Counter* _decompress_timer;
+    RuntimeProfile::Counter* _decompress_bytes;
 
     std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
 };
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index e5b6a5c721..00fd63c692 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -330,6 +330,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int 
sender_id, const RowD
           _cur_pb_block(&_pb_block1),
           _profile(nullptr),
           _serialize_batch_timer(nullptr),
+          _compress_timer(nullptr),
           _bytes_sent_counter(nullptr),
           _local_bytes_send_counter(nullptr),
           _dest_node_id(0) {
@@ -347,6 +348,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, 
const RowDescriptor& row_
           _cur_pb_block(&_pb_block1),
           _profile(nullptr),
           _serialize_batch_timer(nullptr),
+          _compress_timer(nullptr),
           _bytes_sent_counter(nullptr),
           _local_bytes_send_counter(nullptr),
           _dest_node_id(0) {
@@ -425,6 +427,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
     _uncompressed_bytes_counter = ADD_COUNTER(profile(), 
"UncompressedRowBatchSize", TUnit::BYTES);
     _ignore_rows = ADD_COUNTER(profile(), "IgnoreRows", TUnit::UNIT);
     _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
+    _compress_timer = ADD_TIMER(profile(), "CompressTime");
     _overall_throughput = profile()->add_derived_counter(
             "OverallThroughput", TUnit::BYTES_PER_SECOND,
             std::bind<int64_t>(&RuntimeProfile::units_per_second, 
_bytes_sent_counter,
@@ -445,6 +448,8 @@ Status VDataStreamSender::open(RuntimeState* state) {
     for (auto iter : _partition_infos) {
         RETURN_IF_ERROR(iter->open(state));
     }
+
+    _compression_type = state->fragement_transmission_compression_type();
     return Status::OK();
 }
 
@@ -597,9 +602,10 @@ Status VDataStreamSender::serialize_block(Block* src, 
PBlock* dest, int num_rece
         dest->Clear();
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
         RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, 
&compressed_bytes,
-                                       _transfer_large_data_by_brpc));
+                                       _compression_type, 
_transfer_large_data_by_brpc));
         COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
         COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * 
num_receivers);
+        COUNTER_UPDATE(_compress_timer, src->get_compress_time());
     }
 
     return Status::OK();
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 3d572366e1..1c8bcf4f87 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -131,6 +131,7 @@ protected:
 
     RuntimeProfile* _profile; // Allocated from _pool
     RuntimeProfile::Counter* _serialize_batch_timer;
+    RuntimeProfile::Counter* _compress_timer;
     RuntimeProfile::Counter* _bytes_sent_counter;
     RuntimeProfile::Counter* _uncompressed_bytes_counter;
     RuntimeProfile::Counter* _ignore_rows;
@@ -146,6 +147,8 @@ protected:
 
     // User can change this config at runtime, avoid it being modified during 
query or loading process.
     bool _transfer_large_data_by_brpc = false;
+
+    segment_v2::CompressionTypePB _compression_type;
 };
 
 // TODO: support local exechange
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 3cbc73348c..a9d091bf75 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -258,6 +258,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
         SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
         Status st = block.serialize(request.mutable_block(), 
&uncompressed_bytes, &compressed_bytes,
+                                    
state->fragement_transmission_compression_type(),
                                     _parent->_transfer_large_data_by_brpc);
         if (!st.ok()) {
             cancel(fmt::format("{}, err: {}", channel_info(), 
st.get_error_msg()));
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index a18db4e1e5..a3a5be3b26 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -180,10 +180,12 @@ TEST(BlockTest, RowBatchCovertToBlock) {
     }
 }
 
-void block_to_pb(const vectorized::Block& block, PBlock* pblock) {
+void block_to_pb(
+        const vectorized::Block& block, PBlock* pblock,
+        segment_v2::CompressionTypePB compression_type = 
segment_v2::CompressionTypePB::SNAPPY) {
     size_t uncompressed_bytes = 0;
     size_t compressed_bytes = 0;
-    Status st = block.serialize(pblock, &uncompressed_bytes, 
&compressed_bytes);
+    Status st = block.serialize(pblock, &uncompressed_bytes, 
&compressed_bytes, compression_type);
     EXPECT_TRUE(st.ok());
     EXPECT_TRUE(uncompressed_bytes >= compressed_bytes);
     EXPECT_EQ(compressed_bytes, pblock->column_values().size());
@@ -237,7 +239,7 @@ void fill_block_with_array_string(vectorized::Block& block) 
{
     block.insert(test_array_string);
 }
 
-TEST(BlockTest, SerializeAndDeserializeBlock) {
+void serialize_and_deserialize_test(segment_v2::CompressionTypePB 
compression_type) {
     config::compress_rowbatches = true;
     // int
     {
@@ -250,12 +252,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
         vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), 
data_type, "test_int");
         vectorized::Block block({type_and_name});
         PBlock pblock;
-        block_to_pb(block, &pblock);
+        block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2(pblock);
         PBlock pblock2;
-        block_to_pb(block2, &pblock2);
+        block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
         EXPECT_EQ(s1, s2);
     }
@@ -271,12 +273,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
                                                         "test_string");
         vectorized::Block block({type_and_name});
         PBlock pblock;
-        block_to_pb(block, &pblock);
+        block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2(pblock);
         PBlock pblock2;
-        block_to_pb(block2, &pblock2);
+        block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
         EXPECT_EQ(s1, s2);
     }
@@ -295,12 +297,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
                                                         decimal_data_type, 
"test_decimal");
         vectorized::Block block({type_and_name});
         PBlock pblock;
-        block_to_pb(block, &pblock);
+        block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2(pblock);
         PBlock pblock2;
-        block_to_pb(block2, &pblock2);
+        block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
         EXPECT_EQ(s1, s2);
     }
@@ -321,12 +323,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
                                                         "test_bitmap");
         vectorized::Block block({type_and_name});
         PBlock pblock;
-        block_to_pb(block, &pblock);
+        block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2(pblock);
         PBlock pblock2;
-        block_to_pb(block2, &pblock2);
+        block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
         EXPECT_EQ(s1, s2);
     }
@@ -341,12 +343,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
                                                         nullable_data_type, 
"test_nullable");
         vectorized::Block block({type_and_name});
         PBlock pblock;
-        block_to_pb(block, &pblock);
+        block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2(pblock);
         PBlock pblock2;
-        block_to_pb(block2, &pblock2);
+        block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
         EXPECT_EQ(s1, s2);
     }
@@ -361,14 +363,14 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
                 nullable_column->get_ptr(), nullable_data_type, 
"test_nullable_decimal");
         vectorized::Block block({type_and_name});
         PBlock pblock;
-        block_to_pb(block, &pblock);
+        block_to_pb(block, &pblock, compression_type);
         EXPECT_EQ(1, pblock.column_metas_size());
         EXPECT_TRUE(pblock.column_metas()[0].has_decimal_param());
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2(pblock);
         PBlock pblock2;
-        block_to_pb(block2, &pblock2);
+        block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
         EXPECT_EQ(s1, s2);
     }
@@ -385,12 +387,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
                                                         data_type, 
"test_nullable_int32");
         vectorized::Block block({type_and_name});
         PBlock pblock;
-        block_to_pb(block, &pblock);
+        block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2(pblock);
         PBlock pblock2;
-        block_to_pb(block2, &pblock2);
+        block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
         EXPECT_EQ(s1, s2);
     }
@@ -400,17 +402,23 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
         fill_block_with_array_int(block);
         fill_block_with_array_string(block);
         PBlock pblock;
-        block_to_pb(block, &pblock);
+        block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2(pblock);
         PBlock pblock2;
-        block_to_pb(block2, &pblock2);
+        block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
         EXPECT_EQ(s1, s2);
     }
 }
 
+TEST(BlockTest, SerializeAndDeserializeBlock) {
+    config::compress_rowbatches = true;
+    serialize_and_deserialize_test(segment_v2::CompressionTypePB::SNAPPY);
+    serialize_and_deserialize_test(segment_v2::CompressionTypePB::LZ4);
+}
+
 TEST(BlockTest, dump_data) {
     auto vec = vectorized::ColumnVector<Int32>::create();
     auto& int32_data = vec->get_data();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index ccc7a8764d..70be8869d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -207,6 +207,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_FUNCTION_PUSHDOWN = 
"enable_function_pushdown";
 
+    public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC = 
"fragment_transmission_compression_codec";
+
     // session origin value
     public Map<Field, String> sessionOriginValue = new HashMap<Field, 
String>();
     // check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -352,6 +354,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = PREFER_JOIN_METHOD)
     public String preferJoinMethod = "broadcast";
 
+    @VariableMgr.VarAttr(name = FRAGMENT_TRANSMISSION_COMPRESSION_CODEC)
+    public String fragmentTransmissionCompressionCodec = "lz4";
+
     /*
      * the parallel exec instance num for one Fragment in one BE
      * 1 means disable this feature
@@ -1060,6 +1065,10 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enableRemoveNoConjunctsRuntimeFilterPolicy = 
enableRemoveNoConjunctsRuntimeFilterPolicy;
     }
 
+    public void setFragmentTransmissionCompressionCodec(String codec) {
+        this.fragmentTransmissionCompressionCodec = codec;
+    }
+
     // Serialize to thrift object
     // used for rest api
     public TQueryOptions toThrift() {
@@ -1103,6 +1112,7 @@ public class SessionVariable implements Serializable, 
Writable {
         }
 
         tResult.setEnableFunctionPushdown(enableFunctionPushdown);
+        
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
 
         return tResult;
     }
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index b1c52731fb..f066dc4a5b 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -21,6 +21,7 @@ package doris;
 option java_package = "org.apache.doris.proto";
 
 import "types.proto";
+import "segment_v2.proto";
 
 message PNodeStatistics {
     required int64 node_id = 1;
@@ -63,4 +64,6 @@ message PBlock {
     repeated PColumnMeta column_metas = 1;
     optional bytes column_values = 2;
     optional bool compressed = 3 [default = false];
+    optional int64 uncompressed_size = 4;
+    optional segment_v2.CompressionTypePB compression_type = 5 [default = 
SNAPPY];
 }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 910f700310..92d41abbd7 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -165,6 +165,8 @@ struct TQueryOptions {
   44: optional bool trim_tailing_spaces_for_external_table_query = false
 
   45: optional bool enable_function_pushdown;
+
+  46: optional string fragment_transmission_compression_codec;
 }
     
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to