This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new a9fb05d163e branch-4.1: [feature](be) Support zstd stream load 
compression #64711 (#64750)
a9fb05d163e is described below

commit a9fb05d163e388d435c88691e0e441afef0d334f
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 24 10:34:30 2026 +0800

    branch-4.1: [feature](be) Support zstd stream load compression #64711 
(#64750)
    
    Cherry-picked from #64711
    
    Co-authored-by: Refrain <[email protected]>
---
 be/src/service/http/action/http_stream.cpp         |  30 ++++++++++----
 be/src/service/http/action/stream_load.cpp         |   8 +---
 be/src/util/load_util.cpp                          |  25 ++++++++++++
 be/src/util/load_util.h                            |   5 ++-
 be/test/util/load_util_test.cpp                    |  41 ++++++++++++++++++++
 .../data/load_p0/http_stream/test_compress.csv.zst | Bin 0 -> 42 bytes
 .../http_stream/test_group_commit_http_stream.out  |   4 ++
 .../data/load_p0/stream_load/basic_data.csv.zst    | Bin 0 -> 4207 bytes
 .../stream_load/basic_data_by_line.json.zst        | Bin 0 -> 3461 bytes
 .../data/load_p0/stream_load/test_compress.csv.zst | Bin 0 -> 42 bytes
 .../load_p0/stream_load/test_compress_type.out     |   2 +-
 .../stream_load/test_group_commit_stream_load.out  |   4 ++
 .../test_group_commit_http_stream.groovy           |   8 ++--
 .../load_p0/stream_load/test_compress_type.groovy  |  43 +++++++++++++++++++++
 .../test_group_commit_stream_load.groovy           |   6 +--
 15 files changed, 154 insertions(+), 22 deletions(-)

diff --git a/be/src/service/http/action/http_stream.cpp 
b/be/src/service/http/action/http_stream.cpp
index 9e98199ac46..e519a62779f 100644
--- a/be/src/service/http/action/http_stream.cpp
+++ b/be/src/service/http/action/http_stream.cpp
@@ -17,6 +17,7 @@
 
 #include "service/http/action/http_stream.h"
 
+#include <algorithm>
 #include <cstddef>
 #include <future>
 #include <sstream>
@@ -55,6 +56,7 @@
 #include "storage/storage_engine.h"
 #include "util/byte_buffer.h"
 #include "util/client_cache.h"
+#include "util/load_util.h"
 #include "util/string_util.h"
 #include "util/thrift_rpc_helper.h"
 #include "util/time.h"
@@ -63,6 +65,26 @@
 namespace doris {
 using namespace ErrorCode;
 
+namespace {
+
+bool is_compressed_file_scan(const TPipelineFragmentParams& params) {
+    if (!params.__isset.file_scan_params) {
+        return false;
+    }
+    return std::ranges::any_of(params.file_scan_params, [](const auto& 
file_scan_param) {
+        const auto& file_scan_params = file_scan_param.second;
+        TFileCompressType::type compress_type = 
file_scan_params.__isset.compress_type
+                                                        ? 
file_scan_params.compress_type
+                                                        : 
TFileCompressType::UNKNOWN;
+        TFileFormatType::type format_type = 
file_scan_params.__isset.format_type
+                                                    ? 
file_scan_params.format_type
+                                                    : 
TFileFormatType::FORMAT_UNKNOWN;
+        return LoadUtil::is_compressed_load(compress_type, format_type);
+    });
+}
+
+} // namespace
+
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_requests_total, 
MetricUnit::REQUESTS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_duration_ms, 
MetricUnit::MILLISECONDS);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(http_stream_current_processing, 
MetricUnit::REQUESTS);
@@ -387,13 +409,7 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
                                                
http_req->header(HttpHeaders::CONTENT_LENGTH),
                                                e.what());
             }
-            if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
-                ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
-                ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+            if (is_compressed_file_scan(ctx->put_result.pipeline_params)) {
                 content_length *= 3;
             }
         }
diff --git a/be/src/service/http/action/stream_load.cpp 
b/be/src/service/http/action/stream_load.cpp
index 7e0f4ba0977..7cfcccaa547 100644
--- a/be/src/service/http/action/stream_load.cpp
+++ b/be/src/service/http/action/stream_load.cpp
@@ -812,13 +812,7 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
                                                
http_req->header(HttpHeaders::CONTENT_LENGTH),
                                                e.what());
             }
-            if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
-                ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
-                ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
-                ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+            if (LoadUtil::is_compressed_load(ctx->compress_type, ctx->format)) 
{
                 content_length *= 3;
             }
         }
diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp
index ab6e3e887f8..0a7bf155d13 100644
--- a/be/src/util/load_util.cpp
+++ b/be/src/util/load_util.cpp
@@ -37,6 +37,8 @@ void LoadUtil::parse_format(const std::string& format_str, 
const std::string& co
         *compress_type = TFileCompressType::LZO;
     } else if (iequal(compress_type_str, "BZ2")) {
         *compress_type = TFileCompressType::BZ2;
+    } else if (iequal(compress_type_str, "ZSTD")) {
+        *compress_type = TFileCompressType::ZSTD;
     } else if (iequal(compress_type_str, "LZ4") || iequal(compress_type_str, 
"LZ4FRAME")) {
         *compress_type = TFileCompressType::LZ4FRAME;
     } else if (iequal(compress_type_str, "LZ4_BLOCK")) {
@@ -62,6 +64,8 @@ void LoadUtil::parse_format(const std::string& format_str, 
const std::string& co
             *format_type = TFileFormatType::FORMAT_CSV_LZO;
         } else if (iequal(compress_type_str, "BZ2")) {
             *format_type = TFileFormatType::FORMAT_CSV_BZ2;
+        } else if (iequal(compress_type_str, "ZSTD")) {
+            *format_type = TFileFormatType::FORMAT_CSV_PLAIN;
         } else if (iequal(compress_type_str, "LZ4") || 
iequal(compress_type_str, "LZ4FRAME")) {
             *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
         } else if (iequal(compress_type_str, "LZ4_BLOCK")) {
@@ -108,4 +112,25 @@ bool 
LoadUtil::is_format_support_streaming(TFileFormatType::type format) {
     }
     return false;
 }
+
+bool LoadUtil::is_compressed_load(TFileCompressType::type compress_type,
+                                  TFileFormatType::type format_type) {
+    if (compress_type != TFileCompressType::UNKNOWN && compress_type != 
TFileCompressType::PLAIN) {
+        return true;
+    }
+
+    switch (format_type) {
+    case TFileFormatType::FORMAT_CSV_BZ2:
+    case TFileFormatType::FORMAT_CSV_DEFLATE:
+    case TFileFormatType::FORMAT_CSV_GZ:
+    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
+    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+    case TFileFormatType::FORMAT_CSV_LZO:
+    case TFileFormatType::FORMAT_CSV_LZOP:
+    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
+        return true;
+    default:
+        return false;
+    }
+}
 } // namespace  doris
diff --git a/be/src/util/load_util.h b/be/src/util/load_util.h
index 60bd79ab1be..e84222c8fa2 100644
--- a/be/src/util/load_util.h
+++ b/be/src/util/load_util.h
@@ -29,5 +29,8 @@ public:
                              TFileCompressType::type* compress_type);
 
     static bool is_format_support_streaming(TFileFormatType::type format);
+
+    static bool is_compressed_load(TFileCompressType::type compress_type,
+                                   TFileFormatType::type format_type);
 };
-} // namespace  doris
\ No newline at end of file
+} // namespace  doris
diff --git a/be/test/util/load_util_test.cpp b/be/test/util/load_util_test.cpp
index 724e9f3cfc9..c8c4cc201de 100644
--- a/be/test/util/load_util_test.cpp
+++ b/be/test/util/load_util_test.cpp
@@ -106,6 +106,22 @@ TEST_F(LoadUtilTest, ParseTest) {
         EXPECT_EQ(TFileFormatType::FORMAT_CSV_DEFLATE, format_type);
         EXPECT_EQ(TFileCompressType::DEFLATE, compress_type);
     }
+    {
+        // CSV, ZSTD
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("CSV", "ZSTD", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type);
+        EXPECT_EQ(TFileCompressType::ZSTD, compress_type);
+    }
+    {
+        // "", ZSTD
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("", "ZSTD", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type);
+        EXPECT_EQ(TFileCompressType::ZSTD, compress_type);
+    }
     {
         // JSON, ""
         TFileFormatType::type format_type;
@@ -178,6 +194,14 @@ TEST_F(LoadUtilTest, ParseTest) {
         EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
         EXPECT_EQ(TFileCompressType::DEFLATE, compress_type);
     }
+    {
+        // JSON, ZSTD
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("JSON", "ZSTD", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+        EXPECT_EQ(TFileCompressType::ZSTD, compress_type);
+    }
     {
         // JSON, unkonw
         TFileFormatType::type format_type;
@@ -204,4 +228,21 @@ TEST_F(LoadUtilTest, ParseTest) {
     }
 }
 
+TEST_F(LoadUtilTest, IsCompressedLoadTest) {
+    EXPECT_FALSE(LoadUtil::is_compressed_load(TFileCompressType::PLAIN,
+                                              
TFileFormatType::FORMAT_CSV_PLAIN));
+    EXPECT_FALSE(LoadUtil::is_compressed_load(TFileCompressType::UNKNOWN,
+                                              
TFileFormatType::FORMAT_CSV_PLAIN));
+    EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::ZSTD,
+                                             
TFileFormatType::FORMAT_CSV_PLAIN));
+    EXPECT_TRUE(
+            LoadUtil::is_compressed_load(TFileCompressType::PLAIN, 
TFileFormatType::FORMAT_CSV_GZ));
+    EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::UNKNOWN,
+                                             
TFileFormatType::FORMAT_CSV_DEFLATE));
+    EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::SNAPPYBLOCK,
+                                             
TFileFormatType::FORMAT_CSV_PLAIN));
+    EXPECT_FALSE(
+            LoadUtil::is_compressed_load(TFileCompressType::PLAIN, 
TFileFormatType::FORMAT_JSON));
+}
+
 } // namespace doris
diff --git a/regression-test/data/load_p0/http_stream/test_compress.csv.zst 
b/regression-test/data/load_p0/http_stream/test_compress.csv.zst
new file mode 100644
index 00000000000..34c3da9c54e
Binary files /dev/null and 
b/regression-test/data/load_p0/http_stream/test_compress.csv.zst differ
diff --git 
a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out 
b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
index 99954d0853c..d0f44b2055f 100644
--- a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
+++ b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
@@ -3,12 +3,16 @@
 1      a       10
 1      a       10
 1      a       10
+1      a       10
+2      b       20
 2      b       20
 2      b       20
 2      b       20
 3      c       30
 3      c       30
 3      c       30
+3      c       30
+4      d       \N
 4      d       \N
 4      d       \N
 4      d       \N
diff --git a/regression-test/data/load_p0/stream_load/basic_data.csv.zst 
b/regression-test/data/load_p0/stream_load/basic_data.csv.zst
new file mode 100644
index 00000000000..b4a541578bd
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/basic_data.csv.zst differ
diff --git 
a/regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst 
b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst
new file mode 100644
index 00000000000..bed19ec3346
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst differ
diff --git a/regression-test/data/load_p0/stream_load/test_compress.csv.zst 
b/regression-test/data/load_p0/stream_load/test_compress.csv.zst
new file mode 100644
index 00000000000..34c3da9c54e
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/test_compress.csv.zst differ
diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out 
b/regression-test/data/load_p0/stream_load/test_compress_type.out
index 56d195c569e..92a8f1b82a6 100644
--- a/regression-test/data/load_p0/stream_load/test_compress_type.out
+++ b/regression-test/data/load_p0/stream_load/test_compress_type.out
@@ -1,4 +1,4 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
 -- !sql --
-160
+200
 
diff --git 
a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out 
b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
index 3e5d73452d0..fac63335a7e 100644
--- a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
+++ b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
@@ -3,12 +3,16 @@
 1      a       10
 1      a       10
 1      a       10
+1      a       10
+2      b       20
 2      b       20
 2      b       20
 2      b       20
 3      c       30
 3      c       30
 3      c       30
+3      c       30
+4      d       \N
 4      d       \N
 4      d       \N
 4      d       \N
diff --git 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
index b160b478c8e..ad07ee55d74 100644
--- 
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
+++ 
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -94,9 +94,11 @@ suite("test_group_commit_http_stream") {
         """
 
         // stream load with compress file
-        String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ 
"lz4frame"} //, "deflate"}
+        String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ 
"lz4frame", "zstd"} //, "deflate"}
         for (final def compressionType in compressionTypes) {
-            def fileName = "test_compress.csv." + 
(compressionType.equals("lz4frame") ? "lz4" : compressionType)
+            def fileSuffix = compressionType.equals("lz4frame") ? "lz4" : 
compressionType
+            fileSuffix = fileSuffix.equals("zstd") ? "zst" : fileSuffix
+            def fileName = "test_compress.csv." + fileSuffix
             streamLoad {
                 set 'version', '1'
                 set 'sql', """
@@ -249,7 +251,7 @@ suite("test_group_commit_http_stream") {
             }
         }
 
-        getRowCount(19)
+        getRowCount(26)
         qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
 
         // group commit http stream (SELECT * FROM http_stream(...)) should 
not register load jobs
diff --git 
a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy 
b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
index eeab7e80975..5e2870fd4c0 100644
--- a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
@@ -196,6 +196,49 @@ suite("test_stream_load_compress_type", "load_p0") {
         }
     }
 
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '|'
+        set 'trim_double_quotes', 'true'
+        set 'format', 'csv'
+        set 'compress_type', 'zstd'
+
+        file "basic_data.csv.zst"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'format', 'json'
+        set 'compress_type', 'zstd'
+        set 'read_json_by_line', 'true'
+
+        file "basic_data_by_line.json.zst"
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("Success", json.Status)
+                assertEquals(20, json.NumberTotalRows)
+                assertEquals(20, json.NumberLoadedRows)
+                assertEquals(0, json.NumberFilteredRows)
+                assertEquals(0, json.NumberUnselectedRows)
+                assertTrue(json.LoadBytes > 0)
+        }
+    }
+
     // no compress_type
     streamLoad {
         table "${tableName}"
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index 342b87f1396..7d7afc71f7e 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -98,9 +98,9 @@ suite("test_group_commit_stream_load") {
         """
 
         // stream load with compress file
-        String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ 
"lz4"} //, "deflate"}
+        String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/ 
"lz4", "zstd"} //, "deflate"}
         for (final def compressionType in compressionTypes) {
-            def fileName = "test_compress.csv." + compressionType
+            def fileName = "test_compress.csv." + 
(compressionType.equals("zstd") ? "zst" : compressionType)
             streamLoad {
                 table "${tableName}"
 
@@ -227,7 +227,7 @@ suite("test_group_commit_stream_load") {
             }
         }
 
-        getRowCount(21)
+        getRowCount(25)
         qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
     } finally {
         // try_sql("DROP TABLE ${tableName}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to