This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 88d52267e6e [feat](load) stream load supports loading JSON compressed 
format (#48990)
88d52267e6e is described below

commit 88d52267e6eb297a04efda54b2c9355e3bbc94e5
Author: Xin Liao <[email protected]>
AuthorDate: Thu Mar 13 19:58:51 2025 +0800

    [feat](load) stream load supports loading JSON compressed format (#48990)
---
 be/src/http/action/stream_load.cpp                 |   4 -
 be/src/util/load_util.cpp                          |  24 ++++
 be/test/util/load_util_test.cpp                    |  64 +++++++++
 .../load_p0/stream_load/test_json_compress.out     | Bin 0 -> 25202 bytes
 .../load_p0/stream_load/test_json_compress.groovy  | 149 +++++++++++++++++++++
 5 files changed, 237 insertions(+), 4 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index d3720778d85..d91d85695a1 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -251,10 +251,6 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
std::shared_ptr<Strea
     }
 
     // get format of this put
-    if (!http_req->header(HTTP_COMPRESS_TYPE).empty() &&
-        iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
-        return Status::NotSupported("compress data of JSON format is not 
supported.");
-    }
     std::string format_str = http_req->header(HTTP_FORMAT_KEY);
     if (iequal(format_str, BeConsts::CSV_WITH_NAMES) ||
         iequal(format_str, BeConsts::CSV_WITH_NAMES_AND_TYPES)) {
diff --git a/be/src/util/load_util.cpp b/be/src/util/load_util.cpp
index 789038823f6..484012135a0 100644
--- a/be/src/util/load_util.cpp
+++ b/be/src/util/load_util.cpp
@@ -62,6 +62,30 @@ void LoadUtil::parse_format(const std::string& format_str, 
const std::string& co
     } else if (iequal(format_str, "JSON")) {
         if (compress_type_str.empty()) {
             *format_type = TFileFormatType::FORMAT_JSON;
+        } else if (iequal(compress_type_str, "GZ")) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+            *compress_type = TFileCompressType::GZ;
+        } else if (iequal(compress_type_str, "LZO")) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+            *compress_type = TFileCompressType::LZO;
+        } else if (iequal(compress_type_str, "BZ2")) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+            *compress_type = TFileCompressType::BZ2;
+        } else if (iequal(compress_type_str, "LZ4")) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+            *compress_type = TFileCompressType::LZ4FRAME;
+        } else if (iequal(compress_type_str, "LZ4_BLOCK")) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+            *compress_type = TFileCompressType::LZ4BLOCK;
+        } else if (iequal(compress_type_str, "LZOP")) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+            *compress_type = TFileCompressType::LZO;
+        } else if (iequal(compress_type_str, "SNAPPY_BLOCK")) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+            *compress_type = TFileCompressType::SNAPPYBLOCK;
+        } else if (iequal(compress_type_str, "DEFLATE")) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+            *compress_type = TFileCompressType::DEFLATE;
         }
     } else if (iequal(format_str, "PARQUET")) {
         *format_type = TFileFormatType::FORMAT_PARQUET;
diff --git a/be/test/util/load_util_test.cpp b/be/test/util/load_util_test.cpp
index 0b4967befb2..229edd18445 100644
--- a/be/test/util/load_util_test.cpp
+++ b/be/test/util/load_util_test.cpp
@@ -119,6 +119,70 @@ TEST_F(LoadUtilTest, ParseTest) {
         TFileFormatType::type format_type;
         TFileCompressType::type compress_type;
         LoadUtil::parse_format("JSON", "GZ", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+        EXPECT_EQ(TFileCompressType::GZ, compress_type);
+    }
+    {
+        // JSON, LZO
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("JSON", "LZOP", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+        EXPECT_EQ(TFileCompressType::LZO, compress_type);
+    }
+    {
+        // JSON, BZ2
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("JSON", "BZ2", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+        EXPECT_EQ(TFileCompressType::BZ2, compress_type);
+    }
+    {
+        // JSON, LZ4
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("JSON", "LZ4", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+        EXPECT_EQ(TFileCompressType::LZ4FRAME, compress_type);
+    }
+    {
+        // JSON, LZ4_BLOCK
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("JSON", "LZ4_BLOCK", &format_type, 
&compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+        EXPECT_EQ(TFileCompressType::LZ4BLOCK, compress_type);
+    }
+    {
+        // JSON, LZOP
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("JSON", "LZOP", &format_type, &compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+        EXPECT_EQ(TFileCompressType::LZO, compress_type);
+    }
+    {
+        // JSON, SNAPPY_BLOCK
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("JSON", "SNAPPY_BLOCK", &format_type, 
&compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+        EXPECT_EQ(TFileCompressType::SNAPPYBLOCK, compress_type);
+    }
+    {
+        // JSON, DEFLATE
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("JSON", "DEFLATE", &format_type, 
&compress_type);
+        EXPECT_EQ(TFileFormatType::FORMAT_JSON, format_type);
+        EXPECT_EQ(TFileCompressType::DEFLATE, compress_type);
+    }
+    {
+        // JSON, unkonw
+        TFileFormatType::type format_type;
+        TFileCompressType::type compress_type;
+        LoadUtil::parse_format("JSON", "UNKNOWN", &format_type, 
&compress_type);
         EXPECT_EQ(TFileFormatType::FORMAT_UNKNOWN, format_type);
         EXPECT_EQ(TFileCompressType::PLAIN, compress_type);
     }
diff --git a/regression-test/data/load_p0/stream_load/test_json_compress.out 
b/regression-test/data/load_p0/stream_load/test_json_compress.out
new file mode 100644
index 00000000000..79d83433dc8
Binary files /dev/null and 
b/regression-test/data/load_p0/stream_load/test_json_compress.out differ
diff --git 
a/regression-test/suites/load_p0/stream_load/test_json_compress.groovy 
b/regression-test/suites/load_p0/stream_load/test_json_compress.groovy
new file mode 100644
index 00000000000..66b26a6aad0
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_json_compress.groovy
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_json_compress", "p0") { 
+    def tableName = "test_json_compress_tbl"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE ${tableName}
+        (
+            k00 INT             NOT NULL,
+            k01 DATE            NOT NULL,
+            k02 BOOLEAN         NULL,
+            k03 TINYINT         NULL,
+            k04 SMALLINT        NULL,
+            k05 INT             NULL,
+            k06 BIGINT          NULL,
+            k07 LARGEINT        NULL,
+            k08 FLOAT           NULL,
+            k09 DOUBLE          NULL,
+            k10 DECIMAL(9,1)    NULL,
+            k11 DECIMALV3(9,1)  NULL,
+            k12 DATETIME        NULL,
+            k13 DATEV2          NULL,
+            k14 DATETIMEV2      NULL,
+            k15 CHAR            NULL,
+            k16 VARCHAR         NULL,
+            k17 STRING          NULL,
+            k18 JSON            NULL
+    
+        )
+        DUPLICATE KEY(k00)
+        DISTRIBUTED BY HASH(k00) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+
+    streamLoad {
+        set 'format', 'json'
+        set 'compress_type', 'gz'
+        set 'read_json_by_line', 'true'
+        table "${tableName}"
+        time 10000
+        file 
"""${getS3Url()}/regression/load/data/basic_data_by_line.json.gz"""
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(20, json.NumberTotalRows)
+            assertEquals(0, json.NumberFilteredRows)
+        }
+    }
+
+    streamLoad {
+        set 'format', 'json'
+        set 'compress_type', 'bz2'
+        set 'read_json_by_line', 'true'
+        table "${tableName}"
+        time 10000
+        file 
"""${getS3Url()}/regression/load/data/basic_data_by_line.json.bz2"""
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(20, json.NumberTotalRows)
+            assertEquals(0, json.NumberFilteredRows)
+        }
+    }
+
+    streamLoad {
+        set 'format', 'json'
+        set 'compress_type', 'lz4'
+        set 'read_json_by_line', 'true'
+        table "${tableName}"
+        time 10000
+        file 
"""${getS3Url()}/regression/load/data/basic_data_by_line.json.lz4"""
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(20, json.NumberTotalRows)
+            assertEquals(0, json.NumberFilteredRows)
+        }
+    }
+
+    streamLoad {
+        set 'format', 'json'
+        set 'compress_type', 'lzop'
+        set 'read_json_by_line', 'true'
+        table "${tableName}"
+        time 10000
+        file 
"""${getS3Url()}/regression/load/data/basic_data_by_line.json.lzo"""
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(20, json.NumberTotalRows)
+            assertEquals(0, json.NumberFilteredRows)
+        }
+    }
+
+    streamLoad {
+        set 'format', 'json'
+        set 'compress_type', 'deflate'
+        set 'read_json_by_line', 'true'
+        table "${tableName}"
+        time 10000
+        file 
"""${getS3Url()}/regression/load/data/basic_data_by_line.json.deflate"""
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(20, json.NumberTotalRows)
+            assertEquals(0, json.NumberFilteredRows)
+        }
+    }
+
+    qt_sql "select * from ${tableName} order by k00, k01"
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to