This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 88d52267e6e [feat](load) stream load supports loading JSON compressed
format (#48990)
88d52267e6e is described below
commit 88d52267e6eb297a04efda54b2c9355e3bbc94e5
Author: Xin Liao <[email protected]>
AuthorDate: Thu Mar 13 19:58:51 2025 +0800
[feat](load) stream load supports loading JSON compressed format (#48990)
---
be/src/http/action/stream_load.cpp | 4 -
be/src/util/load_util.cpp | 24 ++++
be/test/util/load_util_test.cpp | 64 +++++++++
.../load_p0/stream_load/test_json_compress.out | Bin 0 -> 25202 bytes
.../load_p0/stream_load/test_json_compress.groovy | 149 +++++++++++++++++++++
5 files changed, 237 insertions(+), 4 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index d3720778d85..d91d85695a1 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -251,10 +251,6 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req,
std::shared_ptr<Strea
}
// get format of this put
- if (!http_req->header(HTTP_COMPRESS_TYPE).empty() &&
- iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
- return Status::NotSupported("compress data of JSON format is not
supported.");
- }
std::string format_str = http_req->header(HTTP_FORMAT_KEY);
if (iequal(format_str, BeConsts::CSV_WITH_NAMES) ||
iequal(format_str, BeConsts::CSV_WITH_NAMES_AND_TYPES)) {
diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp
index 789038823f6..484012135a0 100644
--- a/be/src/util/load_util.cpp
+++ b/be/src/util/load_util.cpp
@@ -62,6 +62,30 @@ void LoadUtil::parse_format(const std::string& format_str,
const std::string& co
} else if (iequal(format_str, "JSON")) {
if (compress_type_str.empty()) {
*format_type = TFileFormatType::FORMAT_JSON;
+ } else if (iequal(compress_type_str, "GZ")) {
+ *format_type = TFileFormatType::FORMAT_JSON;
+ *compress_type = TFileCompressType::GZ;
+ } else if (iequal(compress_type_str, "LZO")) {
+ *format_type = TFileFormatType::FORMAT_JSON;
+ *compress_type = TFileCompressType::LZO;
+ } else if (iequal(compress_type_str, "BZ2")) {
+ *format_type = TFileFormatType::FORMAT_JSON;
+ *compress_type = TFileCompressType::BZ2;
+ } else if (iequal(compress_type_str, "LZ4")) {
+ *format_type = TFileFormatType::FORMAT_JSON;
+ *compress_type = TFileCompressType::LZ4FRAME;
+ } else if (iequal(compress_type_str, "LZ4_BLOCK")) {
+ *format_type = TFileFormatType::FORMAT_JSON;
+ *compress_type = TFileCompressType::LZ4BLOCK;
+ } else if (iequal(compress_type_str, "LZOP")) {
+ *format_type = TFileFormatType::FORMAT_JSON;
+ *compress_type = TFileCompressType::LZO;
+ } else if (iequal(compress_type_str, "SNAPPY_BLOCK")) {
+ *format_type = TFileFormatType::FORMAT_JSON;
+ *compress_type = TFileCompressType::SNAPPYBLOCK;
+ } else if (iequal(compress_type_str, "DEFLATE")) {
+ *format_type = TFileFormatType::FORMAT_JSON;
+ *compress_type = TFileCompressType::DEFLATE;
}
} else if (iequal(format_str, "PARQUET")) {
*format_type = TFileFormatType::FORMAT_PARQUET;
diff --git a/be/test/util/load_util_test.cpp b/be/test/util/load_util_test.cpp
index 0b4967befb2..229edd18445 100644
--- a/be/test/util/load_util_test.cpp
+++ b/be/test/util/load_util_test.cpp
@@ -119,6 +119,70 @@ TEST_F(LoadUtilTest, ParseTest) {
TFileFormatType::type format_type;
TFileCompressType::type compress_type;
LoadUtil::parse_format("JSON", "GZ", &format_type, &compress_type);
+ EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+ EXPECT_EQ(TFileCompressType::GZ, compress_type);
+ }
+ {
+ // JSON, LZO
+ TFileFormatType::type format_type;
+ TFileCompressType::type compress_type;
+ LoadUtil::parse_format("JSON", "LZOP", &format_type, &compress_type);
+ EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+ EXPECT_EQ(TFileCompressType::LZO, compress_type);
+ }
+ {
+ // JSON, BZ2
+ TFileFormatType::type format_type;
+ TFileCompressType::type compress_type;
+ LoadUtil::parse_format("JSON", "BZ2", &format_type, &compress_type);
+ EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+ EXPECT_EQ(TFileCompressType::BZ2, compress_type);
+ }
+ {
+ // JSON, LZ4
+ TFileFormatType::type format_type;
+ TFileCompressType::type compress_type;
+ LoadUtil::parse_format("JSON", "LZ4", &format_type, &compress_type);
+ EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+ EXPECT_EQ(TFileCompressType::LZ4FRAME, compress_type);
+ }
+ {
+ // JSON, LZ4_BLOCK
+ TFileFormatType::type format_type;
+ TFileCompressType::type compress_type;
+ LoadUtil::parse_format("JSON", "LZ4_BLOCK", &format_type,
&compress_type);
+ EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+ EXPECT_EQ(TFileCompressType::LZ4BLOCK, compress_type);
+ }
+ {
+ // JSON, LZOP
+ TFileFormatType::type format_type;
+ TFileCompressType::type compress_type;
+ LoadUtil::parse_format("JSON", "LZOP", &format_type, &compress_type);
+ EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+ EXPECT_EQ(TFileCompressType::LZO, compress_type);
+ }
+ {
+ // JSON, SNAPPY_BLOCK
+ TFileFormatType::type format_type;
+ TFileCompressType::type compress_type;
+ LoadUtil::parse_format("JSON", "SNAPPY_BLOCK", &format_type,
&compress_type);
+ EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+ EXPECT_EQ(TFileCompressType::SNAPPYBLOCK, compress_type);
+ }
+ {
+ // JSON, DEFLATE
+ TFileFormatType::type format_type;
+ TFileCompressType::type compress_type;
+ LoadUtil::parse_format("JSON", "DEFLATE", &format_type,
&compress_type);
+ EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+ EXPECT_EQ(TFileCompressType::DEFLATE, compress_type);
+ }
+ {
+ // JSON, unkonw
+ TFileFormatType::type format_type;
+ TFileCompressType::type compress_type;
+ LoadUtil::parse_format("JSON", "UNKNOWN", &format_type,
&compress_type);
EXPECT_EQ(TFileFormatType::FORMAT_UNKNOWN, format_type);
EXPECT_EQ(TFileCompressType::PLAIN, compress_type);
}
diff --git a/regression-test/data/load_p0/stream_load/test_json_compress.out
b/regression-test/data/load_p0/stream_load/test_json_compress.out
new file mode 100644
index 00000000000..79d83433dc8
Binary files /dev/null and
b/regression-test/data/load_p0/stream_load/test_json_compress.out differ
diff --git
a/regression-test/suites/load_p0/stream_load/test_json_compress.groovy
b/regression-test/suites/load_p0/stream_load/test_json_compress.groovy
new file mode 100644
index 00000000000..66b26a6aad0
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_json_compress.groovy
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_json_compress", "p0") {
+ def tableName = "test_json_compress_tbl"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName}
+ (
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL
+
+ )
+ DUPLICATE KEY(k00)
+ DISTRIBUTED BY HASH(k00) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ streamLoad {
+ set 'format', 'json'
+ set 'compress_type', 'gz'
+ set 'read_json_by_line', 'true'
+ table "${tableName}"
+ time 10000
+ file
"""${getS3Url()}/regression/load/data/basic_data_by_line.json.gz"""
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ streamLoad {
+ set 'format', 'json'
+ set 'compress_type', 'bz2'
+ set 'read_json_by_line', 'true'
+ table "${tableName}"
+ time 10000
+ file
"""${getS3Url()}/regression/load/data/basic_data_by_line.json.bz2"""
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ streamLoad {
+ set 'format', 'json'
+ set 'compress_type', 'lz4'
+ set 'read_json_by_line', 'true'
+ table "${tableName}"
+ time 10000
+ file
"""${getS3Url()}/regression/load/data/basic_data_by_line.json.lz4"""
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ streamLoad {
+ set 'format', 'json'
+ set 'compress_type', 'lzop'
+ set 'read_json_by_line', 'true'
+ table "${tableName}"
+ time 10000
+ file
"""${getS3Url()}/regression/load/data/basic_data_by_line.json.lzo"""
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ streamLoad {
+ set 'format', 'json'
+ set 'compress_type', 'deflate'
+ set 'read_json_by_line', 'true'
+ table "${tableName}"
+ time 10000
+ file
"""${getS3Url()}/regression/load/data/basic_data_by_line.json.deflate"""
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(20, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+
+ qt_sql "select * from ${tableName} order by k00, k01"
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]