This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a2ff32245e7 [improve](sdk) add gzip compression support for stream
load (#61373)
a2ff32245e7 is described below
commit a2ff32245e754d7b03f613f133824765bfd0ff21
Author: wudi <[email protected]>
AuthorDate: Thu Mar 19 10:02:44 2026 +0800
[improve](sdk) add gzip compression support for stream load (#61373)
### What problem does this PR solve?
#### Summary
This PR adds gzip compression support to the Doris Go SDK for stream
load.
When `EnableGzip: true` is set in the config, the SDK automatically
compresses
the request body using gzip and adds the `compress_type: gz` HTTP
header,
without requiring the caller to pre-compress the data.
Both CSV and JSON formats are supported. Note that JSON compression
support
depends on the Doris version (Doris3.0.5+)
#### Usage
```go
config := &doris.Config{
Endpoints: []string{"http://127.0.0.1:8030"},
User: "root",
Password: "password",
Database: "test_db",
Table: "users",
Format: doris.DefaultCSVFormat(),
EnableGzip: true,
}
```
#### Notes
- Compression is applied once before the retry loop, so retries do not
incur
extra compression overhead.
- The compressed data is buffered in memory. Since the SDK already
buffers the
original data internally for retry support, this does not introduce
additional
memory copies in practice.
#### Gzip Compression Benchmark
**[CSV format]**
Rows | Approx Size | Original | After gzip | Compressed By
-- | -- | -- | -- | --
100 | ~2 KB | 1590 B | 505 B | 68.2%
1,000 | ~17 KB | 16.49 KB | 4.38 KB | 73.4%
10,000 | ~176 KB | 175.67 KB | 47.74 KB | 72.8%
100,000 | ~1935 KB | 1.89 MB | 473.59 KB | 75.5%
1,000,000 | ~21272 KB | 20.78 MB | 4.74 MB | 77.2%
10,000,000 | ~232210 KB | 226.77 MB | 47.51 MB | 79.1%
**[JSON format]**
Rows | Approx Size | Original | After gzip | Compressed By
-- | -- | -- | -- | --
100 | ~4 KB | 3.70 KB | 629 B | 83.4%
1,000 | ~38 KB | 37.98 KB | 4.92 KB | 87.0%
10,000 | ~391 KB | 390.52 KB | 48.61 KB | 87.6%
100,000 | ~4083 KB | 3.99 MB | 497.33 KB | 87.8%
1,000,000 | ~42756 KB | 41.77 MB | 5.00 MB | 88.0%
10,000,000 | ~447054 KB | 436.58 MB | 49.87 MB | 88.6%
---
sdk/go-doris-sdk/README.md | 24 ++++++
sdk/go-doris-sdk/cmd/compress_bench/main.go | 85 ++++++++++++++++++++++
sdk/go-doris-sdk/cmd/examples/main.go | 8 ++
sdk/go-doris-sdk/examples/gzip_example.go | 64 ++++++++++++++++
.../pkg/load/client/doris_load_client.go | 20 +++++
sdk/go-doris-sdk/pkg/load/config/load_config.go | 24 +++---
.../pkg/load/loader/request_builder.go | 27 +++++++
7 files changed, 241 insertions(+), 11 deletions(-)
diff --git a/sdk/go-doris-sdk/README.md b/sdk/go-doris-sdk/README.md
index a69904fed60..dce9e8b6093 100644
--- a/sdk/go-doris-sdk/README.md
+++ b/sdk/go-doris-sdk/README.md
@@ -184,6 +184,29 @@ GroupCommit: doris.OFF, // Off, use traditional mode
> ⚠️ **Note**: When Group Commit is enabled, all Label configurations are
> automatically ignored and warning logs are recorded.
+### Gzip Compression
+
+```go
+config := &doris.Config{
+ Endpoints: []string{"http://127.0.0.1:8030"},
+ User: "root",
+ Password: "password",
+ Database: "test_db",
+ Table: "users",
+ Format: doris.DefaultCSVFormat(), // works with both CSV and JSON
formats
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.OFF,
+ EnableGzip: true, // SDK compresses the body with gzip and sets
compress_type=gz header automatically
+}
+
+client, _ := doris.NewLoadClient(config)
+
+data := "1,Alice,25\n2,Bob,30\n3,Charlie,35"
+response, err := client.Load(doris.StringReader(data))
+```
+
+> **Note**: The SDK compresses the request body transparently — no need to
pre-compress the data. Whether JSON compression is supported depends on the
Doris version.
+
## 🔄 Concurrent Usage
### Basic Concurrency Example
@@ -330,6 +353,7 @@ go run cmd/examples/main.go single # Large batch load
(100k records)
go run cmd/examples/main.go concurrent # Concurrent load (1M records, 10
workers)
go run cmd/examples/main.go json # JSON load (50k records)
go run cmd/examples/main.go basic # Basic concurrency (5 workers)
+go run cmd/examples/main.go gzip # Gzip compressed CSV load
```
## 🛠️ Utility Tools
diff --git a/sdk/go-doris-sdk/cmd/compress_bench/main.go
b/sdk/go-doris-sdk/cmd/compress_bench/main.go
new file mode 100644
index 00000000000..61516d4d080
--- /dev/null
+++ b/sdk/go-doris-sdk/cmd/compress_bench/main.go
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Usage:
+// go run cmd/compress_bench/main.go
+//
+// Generates sample CSV and JSON data at various batch sizes,
+// compresses with gzip, and prints original/compressed sizes and ratio.
+
+package main
+
+import (
+ "bytes"
+ "compress/gzip"
+ "fmt"
+ "strings"
+)
+
+func gzipSize(data string) int {
+ var buf bytes.Buffer
+ gz := gzip.NewWriter(&buf)
+ gz.Write([]byte(data))
+ gz.Close()
+ return buf.Len()
+}
+
+// buildCSV generates n rows of CSV data
+func buildCSV(n int) string {
+ var sb strings.Builder
+ for i := 0; i < n; i++ {
+ fmt.Fprintf(&sb, "%d,User_%d,%d\n", 1000+i, i, 20+i%50)
+ }
+ return sb.String()
+}
+
+// buildJSON generates n rows of JSON Lines data
+func buildJSON(n int) string {
+ var sb strings.Builder
+ for i := 0; i < n; i++ {
+ fmt.Fprintf(&sb,
"{\"id\":%d,\"name\":\"User_%d\",\"age\":%d}\n", 1000+i, i, 20+i%50)
+ }
+ return sb.String()
+}
+
+func printResult(label string, data string) {
+ original := len(data)
+ compressed := gzipSize(data)
+ compressedBy := (1 - float64(compressed)/float64(original)) * 100
+ fmt.Printf(" %-40s original=%8d B after_gzip=%8d B
compressed_by=%.1f%%\n",
+ label, original, compressed, compressedBy)
+}
+
+func main() {
+ sizes := []int{100, 1000, 10000, 100000, 1000000, 10000000}
+
+ fmt.Println("=== Gzip Compression Benchmark ===")
+ fmt.Println()
+
+ fmt.Println("[CSV format]")
+ for _, n := range sizes {
+ data := buildCSV(n)
+ printResult(fmt.Sprintf("%d rows (~%d KB)", n,
len(data)/1024+1), data)
+ }
+
+ fmt.Println()
+ fmt.Println("[JSON format]")
+ for _, n := range sizes {
+ data := buildJSON(n)
+ printResult(fmt.Sprintf("%d rows (~%d KB)", n,
len(data)/1024+1), data)
+ }
+}
diff --git a/sdk/go-doris-sdk/cmd/examples/main.go
b/sdk/go-doris-sdk/cmd/examples/main.go
index afb87dfa3ec..ed28e9b69b5 100644
--- a/sdk/go-doris-sdk/cmd/examples/main.go
+++ b/sdk/go-doris-sdk/cmd/examples/main.go
@@ -38,6 +38,7 @@ Available Examples:
concurrent - Production concurrent loading (1,000,000 records across 10
workers)
json - Production JSON data loading (50,000 JSON records)
basic - Basic concurrent loading demo (5 workers)
+ gzip - Gzip compressed CSV stream load demo
all - Run all examples sequentially
Examples:
@@ -45,6 +46,7 @@ Examples:
go run cmd/examples/main.go concurrent
go run cmd/examples/main.go json
go run cmd/examples/main.go basic
+ go run cmd/examples/main.go gzip
go run cmd/examples/main.go all
Description:
@@ -52,6 +54,7 @@ Description:
concurrent - Shows high-throughput concurrent loading with 10 workers
processing order data
json - Illustrates JSON Lines format loading with structured user
activity data
basic - Simple concurrent example for learning and development
+ gzip - Shows gzip-compressed CSV loading with automatic compression
by the SDK
all - Runs all examples in sequence for comprehensive testing
For more details, see examples/README.md
@@ -75,6 +78,9 @@ func runExample(name string) {
case "basic":
fmt.Println("Running Basic Concurrent Example...")
examples.RunBasicConcurrentExample()
+ case "gzip":
+ fmt.Println("Running Gzip Compression Example...")
+ examples.GzipExample()
case "all":
fmt.Println("Running All Examples...")
fmt.Println("\n" + strings.Repeat("=", 80))
@@ -86,6 +92,8 @@ func runExample(name string) {
fmt.Println("\n" + strings.Repeat("=", 80))
examples.RunBasicConcurrentExample()
fmt.Println("\n" + strings.Repeat("=", 80))
+ examples.GzipExample()
+ fmt.Println("\n" + strings.Repeat("=", 80))
fmt.Println("All examples completed!")
default:
fmt.Printf("❌ Unknown example: %s\n\n", name)
diff --git a/sdk/go-doris-sdk/examples/gzip_example.go
b/sdk/go-doris-sdk/examples/gzip_example.go
new file mode 100644
index 00000000000..8613a7b9ae7
--- /dev/null
+++ b/sdk/go-doris-sdk/examples/gzip_example.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package examples
+
+import (
+ "fmt"
+ "strings"
+
+ doris "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+func GzipExample() {
+ config := &doris.Config{
+ Endpoints: []string{"http://10.16.10.6:48939"},
+ User: "root",
+ Password: "",
+ Database: "test",
+ Table: "student",
+ Format: doris.DefaultJSONFormat(),
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.OFF,
+ EnableGzip: true,
+ }
+
+ client, err := doris.NewLoadClient(config)
+ if err != nil {
+ fmt.Printf("Failed to create client: %v\n", err)
+ return
+ }
+
+ jsonData := `{"id": 1001, "name": "Alice", "age": 20}
+{"id": 1002, "name": "Bob", "age": 22}
+{"id": 1003, "name": "Charlie", "age": 19}`
+
+ response, err := client.Load(strings.NewReader(jsonData))
+ if err != nil {
+ fmt.Printf("Load failed: %v\n", err)
+ return
+ }
+
+ fmt.Printf("Status: %s\n", response.Status)
+ if response.Status == doris.SUCCESS {
+ fmt.Printf("Loaded rows: %d\n", response.Resp.NumberLoadedRows)
+ fmt.Printf("Load bytes: %d\n", response.Resp.LoadBytes)
+ } else {
+ fmt.Printf("Message: %s\n", response.Resp.Message)
+ fmt.Printf("Error URL: %s\n", response.Resp.ErrorURL)
+ }
+}
diff --git a/sdk/go-doris-sdk/pkg/load/client/doris_load_client.go
b/sdk/go-doris-sdk/pkg/load/client/doris_load_client.go
index 27d59e63aaf..3d25db924ec 100644
--- a/sdk/go-doris-sdk/pkg/load/client/doris_load_client.go
+++ b/sdk/go-doris-sdk/pkg/load/client/doris_load_client.go
@@ -221,6 +221,26 @@ func (c *DorisLoadClient) Load(reader io.Reader)
(*loader.LoadResponse, error) {
}
}
+ // If gzip is enabled, compress the buffered data once before the retry
loop.
+ // This avoids re-compressing on every retry attempt.
+ // The result is a *bytes.Reader so Go's HTTP client can replay it on
307 redirects.
+ if c.config.EnableGzip {
+ r, err := getBodyFunc()
+ if err != nil {
+ return nil, fmt.Errorf("failed to get reader for gzip:
%w", err)
+ }
+ compressed, err := loader.GzipCompress(r)
+ if err != nil {
+ return nil, fmt.Errorf("failed to gzip compress: %w",
err)
+ }
+ getBodyFunc = func() (io.Reader, error) {
+ if _, err := compressed.Seek(0, io.SeekStart); err !=
nil {
+ return nil, fmt.Errorf("failed to seek
compressed data: %w", err)
+ }
+ return compressed, nil
+ }
+ }
+
var lastErr error
var response *loader.LoadResponse
startTime := time.Now()
diff --git a/sdk/go-doris-sdk/pkg/load/config/load_config.go
b/sdk/go-doris-sdk/pkg/load/config/load_config.go
index 60f5c3b3346..b571412e854 100644
--- a/sdk/go-doris-sdk/pkg/load/config/load_config.go
+++ b/sdk/go-doris-sdk/pkg/load/config/load_config.go
@@ -106,17 +106,18 @@ type Retry struct {
// Config contains all configuration for stream load operations
type Config struct {
- Endpoints []string
- User string
- Password string
- Database string
- Table string
- LabelPrefix string
- Label string
- Format Format // Can be &JSONFormat{...} or &CSVFormat{...}
- Retry *Retry
- GroupCommit GroupCommitMode
- Options map[string]string
+ Endpoints []string
+ User string
+ Password string
+ Database string
+ Table string
+ LabelPrefix string
+ Label string
+ Format Format // Can be &JSONFormat{...} or &CSVFormat{...}
+ Retry *Retry
+ GroupCommit GroupCommitMode
+ EnableGzip bool // If true, the SDK compresses the request body with
gzip and sets compress_type=gz
+ Options map[string]string
}
// ValidateInternal validates the configuration
@@ -141,6 +142,7 @@ func (c *Config) ValidateInternal() error {
return fmt.Errorf("format cannot be nil")
}
+
if c.Retry != nil {
if c.Retry.MaxRetryTimes < 0 {
return fmt.Errorf("maxRetryTimes cannot be negative")
diff --git a/sdk/go-doris-sdk/pkg/load/loader/request_builder.go
b/sdk/go-doris-sdk/pkg/load/loader/request_builder.go
index bd06ac40532..2bb58206e83 100644
--- a/sdk/go-doris-sdk/pkg/load/loader/request_builder.go
+++ b/sdk/go-doris-sdk/pkg/load/loader/request_builder.go
@@ -18,6 +18,8 @@
package load
import (
+ "bytes"
+ "compress/gzip"
"encoding/base64"
"fmt"
"io"
@@ -54,6 +56,22 @@ func getNode(endpoints []string) (string, error) {
return endpointURL.Host, nil
}
+// GzipCompress compresses r into memory and returns a *bytes.Reader of the
compressed data.
+// Using *bytes.Reader ensures Go's http.NewRequest automatically sets
GetBody, which is
+// required for the HTTP client to replay the body when Doris FE issues a 307
redirect to BE.
+// Callers should call this once before the retry loop to avoid re-compressing
on each attempt.
+func GzipCompress(r io.Reader) (*bytes.Reader, error) {
+ var buf bytes.Buffer
+ gz := gzip.NewWriter(&buf)
+ if _, err := io.Copy(gz, r); err != nil {
+ return nil, fmt.Errorf("gzip compress: %w", err)
+ }
+ if err := gz.Close(); err != nil {
+ return nil, fmt.Errorf("gzip close: %w", err)
+ }
+ return bytes.NewReader(buf.Bytes()), nil
+}
+
// CreateStreamLoadRequest creates an HTTP PUT request for Doris stream load
func CreateStreamLoadRequest(cfg *config.Config, data io.Reader, attempt int)
(*http.Request, error) {
// Get a random endpoint host
@@ -149,6 +167,15 @@ func buildStreamLoadOptions(cfg *config.Config)
map[string]string {
// Don't add group_commit option
}
+ // Add compress_type header if gzip is enabled.
+ // Warn if user also set compress_type manually in Options to avoid
silent conflicts.
+ if cfg.EnableGzip {
+ if _, exists := result["compress_type"]; exists {
+ log.Warnf("Both EnableGzip and
Options[\"compress_type\"] are set; EnableGzip takes precedence, overriding to
gz")
+ }
+ result["compress_type"] = "gz"
+ }
+
return result
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]