This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new a9fb05d163e branch-4.1: [feature](be) Support zstd stream load
compression #64711 (#64750)
a9fb05d163e is described below
commit a9fb05d163e388d435c88691e0e441afef0d334f
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 24 10:34:30 2026 +0800
branch-4.1: [feature](be) Support zstd stream load compression #64711
(#64750)
Cherry-picked from #64711
Co-authored-by: Refrain <[email protected]>
---
be/src/service/http/action/http_stream.cpp | 30 ++++++++++----
be/src/service/http/action/stream_load.cpp | 8 +---
be/src/util/load_util.cpp | 25 ++++++++++++
be/src/util/load_util.h | 5 ++-
be/test/util/load_util_test.cpp | 41 ++++++++++++++++++++
.../data/load_p0/http_stream/test_compress.csv.zst | Bin 0 -> 42 bytes
.../http_stream/test_group_commit_http_stream.out | 4 ++
.../data/load_p0/stream_load/basic_data.csv.zst | Bin 0 -> 4207 bytes
.../stream_load/basic_data_by_line.json.zst | Bin 0 -> 3461 bytes
.../data/load_p0/stream_load/test_compress.csv.zst | Bin 0 -> 42 bytes
.../load_p0/stream_load/test_compress_type.out | 2 +-
.../stream_load/test_group_commit_stream_load.out | 4 ++
.../test_group_commit_http_stream.groovy | 8 ++--
.../load_p0/stream_load/test_compress_type.groovy | 43 +++++++++++++++++++++
.../test_group_commit_stream_load.groovy | 6 +--
15 files changed, 154 insertions(+), 22 deletions(-)
diff --git a/be/src/service/http/action/http_stream.cpp
b/be/src/service/http/action/http_stream.cpp
index 9e98199ac46..e519a62779f 100644
--- a/be/src/service/http/action/http_stream.cpp
+++ b/be/src/service/http/action/http_stream.cpp
@@ -17,6 +17,7 @@
#include "service/http/action/http_stream.h"
+#include <algorithm>
#include <cstddef>
#include <future>
#include <sstream>
@@ -55,6 +56,7 @@
#include "storage/storage_engine.h"
#include "util/byte_buffer.h"
#include "util/client_cache.h"
+#include "util/load_util.h"
#include "util/string_util.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
@@ -63,6 +65,26 @@
namespace doris {
using namespace ErrorCode;
+namespace {
+
+bool is_compressed_file_scan(const TPipelineFragmentParams& params) {
+ if (!params.__isset.file_scan_params) {
+ return false;
+ }
+ return std::ranges::any_of(params.file_scan_params, [](const auto&
file_scan_param) {
+ const auto& file_scan_params = file_scan_param.second;
+ TFileCompressType::type compress_type =
file_scan_params.__isset.compress_type
+ ?
file_scan_params.compress_type
+ :
TFileCompressType::UNKNOWN;
+ TFileFormatType::type format_type =
file_scan_params.__isset.format_type
+ ?
file_scan_params.format_type
+ :
TFileFormatType::FORMAT_UNKNOWN;
+ return LoadUtil::is_compressed_load(compress_type, format_type);
+ });
+}
+
+} // namespace
+
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_requests_total,
MetricUnit::REQUESTS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_duration_ms,
MetricUnit::MILLISECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(http_stream_current_processing,
MetricUnit::REQUESTS);
@@ -387,13 +409,7 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
http_req->header(HttpHeaders::CONTENT_LENGTH),
e.what());
}
- if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
- ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
- ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+ if (is_compressed_file_scan(ctx->put_result.pipeline_params)) {
content_length *= 3;
}
}
diff --git a/be/src/service/http/action/stream_load.cpp
b/be/src/service/http/action/stream_load.cpp
index 7e0f4ba0977..7cfcccaa547 100644
--- a/be/src/service/http/action/stream_load.cpp
+++ b/be/src/service/http/action/stream_load.cpp
@@ -812,13 +812,7 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
http_req->header(HttpHeaders::CONTENT_LENGTH),
e.what());
}
- if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
- ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
- ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
- ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
+ if (LoadUtil::is_compressed_load(ctx->compress_type, ctx->format))
{
content_length *= 3;
}
}
diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp
index ab6e3e887f8..0a7bf155d13 100644
--- a/be/src/util/load_util.cpp
+++ b/be/src/util/load_util.cpp
@@ -37,6 +37,8 @@ void LoadUtil::parse_format(const std::string& format_str,
const std::string& co
*compress_type = TFileCompressType::LZO;
} else if (iequal(compress_type_str, "BZ2")) {
*compress_type = TFileCompressType::BZ2;
+ } else if (iequal(compress_type_str, "ZSTD")) {
+ *compress_type = TFileCompressType::ZSTD;
} else if (iequal(compress_type_str, "LZ4") || iequal(compress_type_str,
"LZ4FRAME")) {
*compress_type = TFileCompressType::LZ4FRAME;
} else if (iequal(compress_type_str, "LZ4_BLOCK")) {
@@ -62,6 +64,8 @@ void LoadUtil::parse_format(const std::string& format_str,
const std::string& co
*format_type = TFileFormatType::FORMAT_CSV_LZO;
} else if (iequal(compress_type_str, "BZ2")) {
*format_type = TFileFormatType::FORMAT_CSV_BZ2;
+ } else if (iequal(compress_type_str, "ZSTD")) {
+ *format_type = TFileFormatType::FORMAT_CSV_PLAIN;
} else if (iequal(compress_type_str, "LZ4") ||
iequal(compress_type_str, "LZ4FRAME")) {
*format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
} else if (iequal(compress_type_str, "LZ4_BLOCK")) {
@@ -108,4 +112,25 @@ bool
LoadUtil::is_format_support_streaming(TFileFormatType::type format) {
}
return false;
}
+
+bool LoadUtil::is_compressed_load(TFileCompressType::type compress_type,
+ TFileFormatType::type format_type) {
+ if (compress_type != TFileCompressType::UNKNOWN && compress_type !=
TFileCompressType::PLAIN) {
+ return true;
+ }
+
+ switch (format_type) {
+ case TFileFormatType::FORMAT_CSV_BZ2:
+ case TFileFormatType::FORMAT_CSV_DEFLATE:
+ case TFileFormatType::FORMAT_CSV_GZ:
+ case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
+ case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+ case TFileFormatType::FORMAT_CSV_LZO:
+ case TFileFormatType::FORMAT_CSV_LZOP:
+ case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
+ return true;
+ default:
+ return false;
+ }
+}
} // namespace doris
diff --git a/be/src/util/load_util.h b/be/src/util/load_util.h
index 60bd79ab1be..e84222c8fa2 100644
--- a/be/src/util/load_util.h
+++ b/be/src/util/load_util.h
@@ -29,5 +29,8 @@ public:
TFileCompressType::type* compress_type);
static bool is_format_support_streaming(TFileFormatType::type format);
+
+ static bool is_compressed_load(TFileCompressType::type compress_type,
+ TFileFormatType::type format_type);
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/util/load_util_test.cpp b/be/test/util/load_util_test.cpp
index 724e9f3cfc9..c8c4cc201de 100644
--- a/be/test/util/load_util_test.cpp
+++ b/be/test/util/load_util_test.cpp
@@ -106,6 +106,22 @@ TEST_F(LoadUtilTest, ParseTest) {
EXPECT_EQ(TFileFormatType::FORMAT_CSV_DEFLATE, format_type);
EXPECT_EQ(TFileCompressType::DEFLATE, compress_type);
}
+ {
+ // CSV, ZSTD
+ TFileFormatType::type format_type;
+ TFileCompressType::type compress_type;
+ LoadUtil::parse_format("CSV", "ZSTD", &format_type, &compress_type);
+ EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type);
+ EXPECT_EQ(TFileCompressType::ZSTD, compress_type);
+ }
+ {
+ // "", ZSTD
+ TFileFormatType::type format_type;
+ TFileCompressType::type compress_type;
+ LoadUtil::parse_format("", "ZSTD", &format_type, &compress_type);
+ EXPECT_EQ(TFileFormatType::FORMAT_CSV_PLAIN, format_type);
+ EXPECT_EQ(TFileCompressType::ZSTD, compress_type);
+ }
{
// JSON, ""
TFileFormatType::type format_type;
@@ -178,6 +194,14 @@ TEST_F(LoadUtilTest, ParseTest) {
EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
EXPECT_EQ(TFileCompressType::DEFLATE, compress_type);
}
+ {
+ // JSON, ZSTD
+ TFileFormatType::type format_type;
+ TFileCompressType::type compress_type;
+ LoadUtil::parse_format("JSON", "ZSTD", &format_type, &compress_type);
+ EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+ EXPECT_EQ(TFileCompressType::ZSTD, compress_type);
+ }
{
// JSON, unkonw
TFileFormatType::type format_type;
@@ -204,4 +228,21 @@ TEST_F(LoadUtilTest, ParseTest) {
}
}
+TEST_F(LoadUtilTest, IsCompressedLoadTest) {
+ EXPECT_FALSE(LoadUtil::is_compressed_load(TFileCompressType::PLAIN,
+
TFileFormatType::FORMAT_CSV_PLAIN));
+ EXPECT_FALSE(LoadUtil::is_compressed_load(TFileCompressType::UNKNOWN,
+
TFileFormatType::FORMAT_CSV_PLAIN));
+ EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::ZSTD,
+
TFileFormatType::FORMAT_CSV_PLAIN));
+ EXPECT_TRUE(
+ LoadUtil::is_compressed_load(TFileCompressType::PLAIN,
TFileFormatType::FORMAT_CSV_GZ));
+ EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::UNKNOWN,
+
TFileFormatType::FORMAT_CSV_DEFLATE));
+ EXPECT_TRUE(LoadUtil::is_compressed_load(TFileCompressType::SNAPPYBLOCK,
+
TFileFormatType::FORMAT_CSV_PLAIN));
+ EXPECT_FALSE(
+ LoadUtil::is_compressed_load(TFileCompressType::PLAIN,
TFileFormatType::FORMAT_JSON));
+}
+
} // namespace doris
diff --git a/regression-test/data/load_p0/http_stream/test_compress.csv.zst
b/regression-test/data/load_p0/http_stream/test_compress.csv.zst
new file mode 100644
index 00000000000..34c3da9c54e
Binary files /dev/null and
b/regression-test/data/load_p0/http_stream/test_compress.csv.zst differ
diff --git
a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
index 99954d0853c..d0f44b2055f 100644
--- a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
+++ b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out
@@ -3,12 +3,16 @@
1 a 10
1 a 10
1 a 10
+1 a 10
+2 b 20
2 b 20
2 b 20
2 b 20
3 c 30
3 c 30
3 c 30
+3 c 30
+4 d \N
4 d \N
4 d \N
4 d \N
diff --git a/regression-test/data/load_p0/stream_load/basic_data.csv.zst
b/regression-test/data/load_p0/stream_load/basic_data.csv.zst
new file mode 100644
index 00000000000..b4a541578bd
Binary files /dev/null and
b/regression-test/data/load_p0/stream_load/basic_data.csv.zst differ
diff --git
a/regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst
b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst
new file mode 100644
index 00000000000..bed19ec3346
Binary files /dev/null and
b/regression-test/data/load_p0/stream_load/basic_data_by_line.json.zst differ
diff --git a/regression-test/data/load_p0/stream_load/test_compress.csv.zst
b/regression-test/data/load_p0/stream_load/test_compress.csv.zst
new file mode 100644
index 00000000000..34c3da9c54e
Binary files /dev/null and
b/regression-test/data/load_p0/stream_load/test_compress.csv.zst differ
diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out
b/regression-test/data/load_p0/stream_load/test_compress_type.out
index 56d195c569e..92a8f1b82a6 100644
--- a/regression-test/data/load_p0/stream_load/test_compress_type.out
+++ b/regression-test/data/load_p0/stream_load/test_compress_type.out
@@ -1,4 +1,4 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
-- !sql --
-160
+200
diff --git
a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
index 3e5d73452d0..fac63335a7e 100644
--- a/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
+++ b/regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out
@@ -3,12 +3,16 @@
1 a 10
1 a 10
1 a 10
+1 a 10
+2 b 20
2 b 20
2 b 20
2 b 20
3 c 30
3 c 30
3 c 30
+3 c 30
+4 d \N
4 d \N
4 d \N
4 d \N
diff --git
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
index b160b478c8e..ad07ee55d74 100644
---
a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
+++
b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy
@@ -94,9 +94,11 @@ suite("test_group_commit_http_stream") {
"""
// stream load with compress file
- String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/
"lz4frame"} //, "deflate"}
+ String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/
"lz4frame", "zstd"} //, "deflate"}
for (final def compressionType in compressionTypes) {
- def fileName = "test_compress.csv." +
(compressionType.equals("lz4frame") ? "lz4" : compressionType)
+ def fileSuffix = compressionType.equals("lz4frame") ? "lz4" :
compressionType
+ fileSuffix = fileSuffix.equals("zstd") ? "zst" : fileSuffix
+ def fileName = "test_compress.csv." + fileSuffix
streamLoad {
set 'version', '1'
set 'sql', """
@@ -249,7 +251,7 @@ suite("test_group_commit_http_stream") {
}
}
- getRowCount(19)
+ getRowCount(26)
qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
// group commit http stream (SELECT * FROM http_stream(...)) should
not register load jobs
diff --git
a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
index eeab7e80975..5e2870fd4c0 100644
--- a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy
@@ -196,6 +196,49 @@ suite("test_stream_load_compress_type", "load_p0") {
}
}
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', 'csv'
+ set 'compress_type', 'zstd'
+
+ file "basic_data.csv.zst"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
+ streamLoad {
+ table "${tableName}"
+ set 'format', 'json'
+ set 'compress_type', 'zstd'
+ set 'read_json_by_line', 'true'
+
+ file "basic_data_by_line.json.zst"
+ check {
+ result, exception, startTime, endTime ->
+ assertTrue(exception == null)
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("Success", json.Status)
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(20, json.NumberLoadedRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ assertTrue(json.LoadBytes > 0)
+ }
+ }
+
// no compress_type
streamLoad {
table "${tableName}"
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
index 342b87f1396..7d7afc71f7e 100644
---
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy
@@ -98,9 +98,9 @@ suite("test_group_commit_stream_load") {
"""
// stream load with compress file
- String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/
"lz4"} //, "deflate"}
+ String[] compressionTypes = new String[]{"gz", "bz2", /*"lzo",*/
"lz4", "zstd"} //, "deflate"}
for (final def compressionType in compressionTypes) {
- def fileName = "test_compress.csv." + compressionType
+ def fileName = "test_compress.csv." +
(compressionType.equals("zstd") ? "zst" : compressionType)
streamLoad {
table "${tableName}"
@@ -227,7 +227,7 @@ suite("test_group_commit_stream_load") {
}
}
- getRowCount(21)
+ getRowCount(25)
qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
} finally {
// try_sql("DROP TABLE ${tableName}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]