This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a2ff32245e7 [improve](sdk) add gzip compression support for stream 
load (#61373)
a2ff32245e7 is described below

commit a2ff32245e754d7b03f613f133824765bfd0ff21
Author: wudi <[email protected]>
AuthorDate: Thu Mar 19 10:02:44 2026 +0800

    [improve](sdk) add gzip compression support for stream load (#61373)
    
    ### What problem does this PR solve?
    
    #### Summary
    
    This PR adds gzip compression support to the Doris Go SDK for stream
    load.
    
    When `EnableGzip: true` is set in the config, the SDK automatically
    compresses
    the request body using gzip and adds the `compress_type: gz` HTTP
    header,
      without requiring the caller to pre-compress the data.
    
    Both CSV and JSON formats are supported. Note that JSON compression
    support
      depends on the Doris version (Doris3.0.5+)
    
     #### Usage
    
      ```go
      config := &doris.Config{
          Endpoints:  []string{"http://127.0.0.1:8030"},
          User:       "root",
          Password:   "password",
          Database:   "test_db",
          Table:      "users",
          Format:     doris.DefaultCSVFormat(),
          EnableGzip: true,
      }
    ```
    #### Notes
    
      - Compression is applied once before the retry loop, so retries do not 
incur
      extra compression overhead.
      - The compressed data is buffered in memory. Since the SDK already 
buffers the
      original data internally for retry support, this does not introduce 
additional
      memory copies in practice.
    
    #### Gzip Compression Benchmark
    
    **[CSV format]**
    Rows | Approx Size | Original | After gzip | Compressed By
    -- | -- | -- | -- | --
    100 | ~2 KB | 1590 B | 505 B | 68.2%
    1,000 | ~17 KB | 16.49 KB | 4.38 KB | 73.4%
    10,000 | ~176 KB | 175.67 KB | 47.74 KB | 72.8%
    100,000 | ~1935 KB | 1.89 MB | 473.59 KB | 75.5%
    1,000,000 | ~21272 KB | 20.78 MB | 4.74 MB | 77.2%
    10,000,000 | ~232210 KB | 226.77 MB | 47.51 MB | 79.1%
    
    
    **[JSON format]**
    Rows | Approx Size | Original | After gzip | Compressed By
    -- | -- | -- | -- | --
    100 | ~4 KB | 3.70 KB | 629 B | 83.4%
    1,000 | ~38 KB | 37.98 KB | 4.92 KB | 87.0%
    10,000 | ~391 KB | 390.52 KB | 48.61 KB | 87.6%
    100,000 | ~4083 KB | 3.99 MB | 497.33 KB | 87.8%
    1,000,000 | ~42756 KB | 41.77 MB | 5.00 MB | 88.0%
    10,000,000 | ~447054 KB | 436.58 MB | 49.87 MB | 88.6%
---
 sdk/go-doris-sdk/README.md                         | 24 ++++++
 sdk/go-doris-sdk/cmd/compress_bench/main.go        | 85 ++++++++++++++++++++++
 sdk/go-doris-sdk/cmd/examples/main.go              |  8 ++
 sdk/go-doris-sdk/examples/gzip_example.go          | 64 ++++++++++++++++
 .../pkg/load/client/doris_load_client.go           | 20 +++++
 sdk/go-doris-sdk/pkg/load/config/load_config.go    | 24 +++---
 .../pkg/load/loader/request_builder.go             | 27 +++++++
 7 files changed, 241 insertions(+), 11 deletions(-)

diff --git a/sdk/go-doris-sdk/README.md b/sdk/go-doris-sdk/README.md
index a69904fed60..dce9e8b6093 100644
--- a/sdk/go-doris-sdk/README.md
+++ b/sdk/go-doris-sdk/README.md
@@ -184,6 +184,29 @@ GroupCommit: doris.OFF,    // Off, use traditional mode
 
 > ⚠️ **Note**: When Group Commit is enabled, all Label configurations are 
 > automatically ignored and warning logs are recorded.
 
+### Gzip Compression
+
+```go
+config := &doris.Config{
+       Endpoints:   []string{"http://127.0.0.1:8030"},
+       User:        "root",
+       Password:    "password",
+       Database:    "test_db",
+       Table:       "users",
+       Format:      doris.DefaultCSVFormat(), // works with both CSV and JSON 
formats
+       Retry:       doris.DefaultRetry(),
+       GroupCommit: doris.OFF,
+       EnableGzip:  true, // SDK compresses the body with gzip and sets 
compress_type=gz header automatically
+}
+
+client, _ := doris.NewLoadClient(config)
+
+data := "1,Alice,25\n2,Bob,30\n3,Charlie,35"
+response, err := client.Load(doris.StringReader(data))
+```
+
+> **Note**: The SDK compresses the request body transparently — no need to 
pre-compress the data. Whether JSON compression is supported depends on the 
Doris version.
+
 ## 🔄 Concurrent Usage
 
 ### Basic Concurrency Example
@@ -330,6 +353,7 @@ go run cmd/examples/main.go single       # Large batch load 
(100k records)
 go run cmd/examples/main.go concurrent   # Concurrent load (1M records, 10 
workers)
 go run cmd/examples/main.go json         # JSON load (50k records)
 go run cmd/examples/main.go basic        # Basic concurrency (5 workers)
+go run cmd/examples/main.go gzip         # Gzip compressed CSV load
 ```
 
 ## 🛠️ Utility Tools
diff --git a/sdk/go-doris-sdk/cmd/compress_bench/main.go 
b/sdk/go-doris-sdk/cmd/compress_bench/main.go
new file mode 100644
index 00000000000..61516d4d080
--- /dev/null
+++ b/sdk/go-doris-sdk/cmd/compress_bench/main.go
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Usage:
+//   go run cmd/compress_bench/main.go
+//
+// Generates sample CSV and JSON data at various batch sizes,
+// compresses with gzip, and prints original/compressed sizes and ratio.
+
+package main
+
+import (
+       "bytes"
+       "compress/gzip"
+       "fmt"
+       "strings"
+)
+
+func gzipSize(data string) int {
+       var buf bytes.Buffer
+       gz := gzip.NewWriter(&buf)
+       gz.Write([]byte(data))
+       gz.Close()
+       return buf.Len()
+}
+
+// buildCSV generates n rows of CSV data
+func buildCSV(n int) string {
+       var sb strings.Builder
+       for i := 0; i < n; i++ {
+               fmt.Fprintf(&sb, "%d,User_%d,%d\n", 1000+i, i, 20+i%50)
+       }
+       return sb.String()
+}
+
+// buildJSON generates n rows of JSON Lines data
+func buildJSON(n int) string {
+       var sb strings.Builder
+       for i := 0; i < n; i++ {
+               fmt.Fprintf(&sb, 
"{\"id\":%d,\"name\":\"User_%d\",\"age\":%d}\n", 1000+i, i, 20+i%50)
+       }
+       return sb.String()
+}
+
+func printResult(label string, data string) {
+       original := len(data)
+       compressed := gzipSize(data)
+       compressedBy := (1 - float64(compressed)/float64(original)) * 100
+       fmt.Printf("  %-40s original=%8d B  after_gzip=%8d B  
compressed_by=%.1f%%\n",
+               label, original, compressed, compressedBy)
+}
+
+func main() {
+       sizes := []int{100, 1000, 10000, 100000, 1000000, 10000000}
+
+       fmt.Println("=== Gzip Compression Benchmark ===")
+       fmt.Println()
+
+       fmt.Println("[CSV format]")
+       for _, n := range sizes {
+               data := buildCSV(n)
+               printResult(fmt.Sprintf("%d rows (~%d KB)", n, 
len(data)/1024+1), data)
+       }
+
+       fmt.Println()
+       fmt.Println("[JSON format]")
+       for _, n := range sizes {
+               data := buildJSON(n)
+               printResult(fmt.Sprintf("%d rows (~%d KB)", n, 
len(data)/1024+1), data)
+       }
+}
diff --git a/sdk/go-doris-sdk/cmd/examples/main.go 
b/sdk/go-doris-sdk/cmd/examples/main.go
index afb87dfa3ec..ed28e9b69b5 100644
--- a/sdk/go-doris-sdk/cmd/examples/main.go
+++ b/sdk/go-doris-sdk/cmd/examples/main.go
@@ -38,6 +38,7 @@ Available Examples:
   concurrent  - Production concurrent loading (1,000,000 records across 10 
workers)
   json        - Production JSON data loading (50,000 JSON records)
   basic       - Basic concurrent loading demo (5 workers)
+  gzip        - Gzip compressed CSV stream load demo
   all         - Run all examples sequentially
 
 Examples:
@@ -45,6 +46,7 @@ Examples:
   go run cmd/examples/main.go concurrent
   go run cmd/examples/main.go json
   go run cmd/examples/main.go basic
+  go run cmd/examples/main.go gzip
   go run cmd/examples/main.go all
 
 Description:
@@ -52,6 +54,7 @@ Description:
   concurrent  - Shows high-throughput concurrent loading with 10 workers 
processing order data
   json        - Illustrates JSON Lines format loading with structured user 
activity data
   basic       - Simple concurrent example for learning and development
+  gzip        - Shows gzip-compressed CSV loading with automatic compression 
by the SDK
   all         - Runs all examples in sequence for comprehensive testing
 
 For more details, see examples/README.md
@@ -75,6 +78,9 @@ func runExample(name string) {
        case "basic":
                fmt.Println("Running Basic Concurrent Example...")
                examples.RunBasicConcurrentExample()
+       case "gzip":
+               fmt.Println("Running Gzip Compression Example...")
+               examples.GzipExample()
        case "all":
                fmt.Println("Running All Examples...")
                fmt.Println("\n" + strings.Repeat("=", 80))
@@ -86,6 +92,8 @@ func runExample(name string) {
                fmt.Println("\n" + strings.Repeat("=", 80))
                examples.RunBasicConcurrentExample()
                fmt.Println("\n" + strings.Repeat("=", 80))
+               examples.GzipExample()
+               fmt.Println("\n" + strings.Repeat("=", 80))
                fmt.Println("All examples completed!")
        default:
                fmt.Printf("❌ Unknown example: %s\n\n", name)
diff --git a/sdk/go-doris-sdk/examples/gzip_example.go 
b/sdk/go-doris-sdk/examples/gzip_example.go
new file mode 100644
index 00000000000..8613a7b9ae7
--- /dev/null
+++ b/sdk/go-doris-sdk/examples/gzip_example.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package examples
+
+import (
+       "fmt"
+       "strings"
+
+       doris "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+func GzipExample() {
+       config := &doris.Config{
+               Endpoints:   []string{"http://10.16.10.6:48939"},
+               User:        "root",
+               Password:    "",
+               Database:    "test",
+               Table:       "student",
+               Format:      doris.DefaultJSONFormat(),
+               Retry:       doris.DefaultRetry(),
+               GroupCommit: doris.OFF,
+               EnableGzip:  true,
+       }
+
+       client, err := doris.NewLoadClient(config)
+       if err != nil {
+               fmt.Printf("Failed to create client: %v\n", err)
+               return
+       }
+
+       jsonData := `{"id": 1001, "name": "Alice", "age": 20}
+{"id": 1002, "name": "Bob", "age": 22}
+{"id": 1003, "name": "Charlie", "age": 19}`
+
+       response, err := client.Load(strings.NewReader(jsonData))
+       if err != nil {
+               fmt.Printf("Load failed: %v\n", err)
+               return
+       }
+
+       fmt.Printf("Status: %s\n", response.Status)
+       if response.Status == doris.SUCCESS {
+               fmt.Printf("Loaded rows: %d\n", response.Resp.NumberLoadedRows)
+               fmt.Printf("Load bytes: %d\n", response.Resp.LoadBytes)
+       } else {
+               fmt.Printf("Message: %s\n", response.Resp.Message)
+               fmt.Printf("Error URL: %s\n", response.Resp.ErrorURL)
+       }
+}
diff --git a/sdk/go-doris-sdk/pkg/load/client/doris_load_client.go 
b/sdk/go-doris-sdk/pkg/load/client/doris_load_client.go
index 27d59e63aaf..3d25db924ec 100644
--- a/sdk/go-doris-sdk/pkg/load/client/doris_load_client.go
+++ b/sdk/go-doris-sdk/pkg/load/client/doris_load_client.go
@@ -221,6 +221,26 @@ func (c *DorisLoadClient) Load(reader io.Reader) 
(*loader.LoadResponse, error) {
                }
        }
 
+       // If gzip is enabled, compress the buffered data once before the retry 
loop.
+       // This avoids re-compressing on every retry attempt.
+       // The result is a *bytes.Reader so Go's HTTP client can replay it on 
307 redirects.
+       if c.config.EnableGzip {
+               r, err := getBodyFunc()
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get reader for gzip: 
%w", err)
+               }
+               compressed, err := loader.GzipCompress(r)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to gzip compress: %w", 
err)
+               }
+               getBodyFunc = func() (io.Reader, error) {
+                       if _, err := compressed.Seek(0, io.SeekStart); err != 
nil {
+                               return nil, fmt.Errorf("failed to seek 
compressed data: %w", err)
+                       }
+                       return compressed, nil
+               }
+       }
+
        var lastErr error
        var response *loader.LoadResponse
        startTime := time.Now()
diff --git a/sdk/go-doris-sdk/pkg/load/config/load_config.go 
b/sdk/go-doris-sdk/pkg/load/config/load_config.go
index 60f5c3b3346..b571412e854 100644
--- a/sdk/go-doris-sdk/pkg/load/config/load_config.go
+++ b/sdk/go-doris-sdk/pkg/load/config/load_config.go
@@ -106,17 +106,18 @@ type Retry struct {
 
 // Config contains all configuration for stream load operations
 type Config struct {
-       Endpoints   []string
-       User        string
-       Password    string
-       Database    string
-       Table       string
-       LabelPrefix string
-       Label       string
-       Format      Format // Can be &JSONFormat{...} or &CSVFormat{...}
-       Retry       *Retry
-       GroupCommit GroupCommitMode
-       Options     map[string]string
+       Endpoints    []string
+       User         string
+       Password     string
+       Database     string
+       Table        string
+       LabelPrefix  string
+       Label        string
+       Format       Format // Can be &JSONFormat{...} or &CSVFormat{...}
+       Retry        *Retry
+       GroupCommit  GroupCommitMode
+       EnableGzip   bool // If true, the SDK compresses the request body with 
gzip and sets compress_type=gz
+       Options      map[string]string
 }
 
 // ValidateInternal validates the configuration
@@ -141,6 +142,7 @@ func (c *Config) ValidateInternal() error {
                return fmt.Errorf("format cannot be nil")
        }
 
+
        if c.Retry != nil {
                if c.Retry.MaxRetryTimes < 0 {
                        return fmt.Errorf("maxRetryTimes cannot be negative")
diff --git a/sdk/go-doris-sdk/pkg/load/loader/request_builder.go 
b/sdk/go-doris-sdk/pkg/load/loader/request_builder.go
index bd06ac40532..2bb58206e83 100644
--- a/sdk/go-doris-sdk/pkg/load/loader/request_builder.go
+++ b/sdk/go-doris-sdk/pkg/load/loader/request_builder.go
@@ -18,6 +18,8 @@
 package load
 
 import (
+       "bytes"
+       "compress/gzip"
        "encoding/base64"
        "fmt"
        "io"
@@ -54,6 +56,22 @@ func getNode(endpoints []string) (string, error) {
        return endpointURL.Host, nil
 }
 
+// GzipCompress compresses r into memory and returns a *bytes.Reader of the 
compressed data.
+// Using *bytes.Reader ensures Go's http.NewRequest automatically sets 
GetBody, which is
+// required for the HTTP client to replay the body when Doris FE issues a 307 
redirect to BE.
+// Callers should call this once before the retry loop to avoid re-compressing 
on each attempt.
+func GzipCompress(r io.Reader) (*bytes.Reader, error) {
+       var buf bytes.Buffer
+       gz := gzip.NewWriter(&buf)
+       if _, err := io.Copy(gz, r); err != nil {
+               return nil, fmt.Errorf("gzip compress: %w", err)
+       }
+       if err := gz.Close(); err != nil {
+               return nil, fmt.Errorf("gzip close: %w", err)
+       }
+       return bytes.NewReader(buf.Bytes()), nil
+}
+
 // CreateStreamLoadRequest creates an HTTP PUT request for Doris stream load
 func CreateStreamLoadRequest(cfg *config.Config, data io.Reader, attempt int) 
(*http.Request, error) {
        // Get a random endpoint host
@@ -149,6 +167,15 @@ func buildStreamLoadOptions(cfg *config.Config) 
map[string]string {
                // Don't add group_commit option
        }
 
+       // Add compress_type header if gzip is enabled.
+       // Warn if user also set compress_type manually in Options to avoid 
silent conflicts.
+       if cfg.EnableGzip {
+               if _, exists := result["compress_type"]; exists {
+                       log.Warnf("Both EnableGzip and 
Options[\"compress_type\"] are set; EnableGzip takes precedence, overriding to 
gz")
+               }
+               result["compress_type"] = "gz"
+       }
+
        return result
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to