This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a944224 perf(parquet/compress): set zstd pool encoder concurrency to 
1 (#717)
5a944224 is described below

commit 5a944224b27705e968a05062bea93b85da1e2e2f
Author: Dima Kuznetsov <[email protected]>
AuthorDate: Tue Mar 17 19:02:31 2026 +0200

    perf(parquet/compress): set zstd pool encoder concurrency to 1 (#717)
    
    The zstdEncoderPool is used exclusively by EncodeAll(), which is a
    single-shot synchronous call that uses exactly one inner block encoder.
    However, zstd.NewWriter defaults concurrent to runtime.GOMAXPROCS,
    pre-allocating that many inner block encoders — each with its own ~1 MiB
    history buffer (ensureHist). On a 10-core machine, each pooled Encoder
    allocates 10 inner encoders when only 1 is ever used by EncodeAll.
    
    With WithEncoderConcurrency(1), each pooled encoder creates a single
    inner encoder, matching actual usage. The streaming Write/Close path is
    unaffected — it does not use the pool.
    
    Benchmark results (Apple M4 Pro, arm64, 256 KiB semi-random data):
    
    BenchmarkZstdPooledEncodeAll/Default-14 11000 B/op 5250 MB/s
    BenchmarkZstdPooledEncodeAll/Concurrency1-14 810 B/op 5500 MB/s
    
    14x less memory per operation, ~5% higher throughput from reduced GC
    pressure.
    
    In a parquet write workload (1 GiB Arrow data, ZSTD level 3), this
    reduced ensureHist allocations from 22 GiB to 7 GiB and madvise kernel
    CPU from 4.6s to 2.3s (10% wall-time improvement).
    
    ### Rationale for this change
    
    High memory churn during parquet encoding
    
    ### What changes are included in this PR?
    
    Change to zstd encoder concurrency, a benchmark to reproduce results.
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    No
---
 parquet/compress/compress_test.go | 68 +++++++++++++++++++++++++++++++++++++++
 parquet/compress/zstd.go          |  2 +-
 2 files changed, 69 insertions(+), 1 deletion(-)

diff --git a/parquet/compress/compress_test.go 
b/parquet/compress/compress_test.go
index 00f13760..c60c65d7 100644
--- a/parquet/compress/compress_test.go
+++ b/parquet/compress/compress_test.go
@@ -20,9 +20,11 @@ import (
        "bytes"
        "io"
        "math/rand"
+       "sync"
        "testing"
 
        "github.com/apache/arrow-go/v18/parquet/compress"
+       "github.com/klauspost/compress/zstd"
        "github.com/stretchr/testify/assert"
 )
 
@@ -179,3 +181,69 @@ func TestUnmarshalTextError(t *testing.T) {
        err := compression.UnmarshalText([]byte("NO SUCH CODEC"))
        assert.EqualError(t, err, "not a valid CompressionCodec string")
 }
+
+// BenchmarkZstdPooledEncodeAll compares zstd EncodeAll throughput and 
allocation
+// overhead for pooled encoders created with the default concurrency 
(GOMAXPROCS
+// inner block encoders) vs concurrency=1 (single inner block encoder).
+//
+// Each inner block encoder carries a ~1 MiB history buffer allocated on first
+// use (ensureHist). With the default, a pooled encoder pre-allocates 
GOMAXPROCS
+// of these buffers even though EncodeAll only ever uses one at a time. Setting
+// concurrency=1 eliminates the wasted allocations.
+//
+// The benchmark uses semi-random data (seeded random blocks mixed with 
repeated
+// patterns) to exercise the encoder's history window realistically — matching
+// typical parquet page payloads that contain a mix of unique and repeated 
values.
+func BenchmarkZstdPooledEncodeAll(b *testing.B) {
+       // 256 KiB of semi-random data — typical parquet page size.
+       // Mix random and repeated segments so the encoder exercises its full
+       // match-finding and history-window code paths.
+       data := make([]byte, 256*1024)
+       r := rand.New(rand.NewSource(42))
+       r.Read(data)
+       // Overwrite ~25% with repeated pattern to give the encoder something 
to match.
+       pattern := []byte("parquet-page-data-pattern-0123456789abcdef")
+       for i := 0; i < len(data)/4; i += len(pattern) {
+               copy(data[i:], pattern)
+       }
+
+       for _, tc := range []struct {
+               name        string
+               concurrency int
+       }{
+               {"Default", 0},      // GOMAXPROCS inner encoders
+               {"Concurrency1", 1}, // single inner encoder
+       } {
+               b.Run(tc.name, func(b *testing.B) {
+                       pool := &sync.Pool{
+                               New: func() interface{} {
+                                       opts := []zstd.EOption{
+                                               zstd.WithZeroFrames(true),
+                                               
zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(3)),
+                                       }
+                                       if tc.concurrency > 0 {
+                                               opts = append(opts, 
zstd.WithEncoderConcurrency(tc.concurrency))
+                                       }
+                                       enc, _ := zstd.NewWriter(nil, opts...)
+                                       return enc
+                               },
+                       }
+                       codec, err := compress.GetCodec(compress.Codecs.Zstd)
+                       if err != nil {
+                               b.Fatal(err)
+                       }
+                       dst := make([]byte, 
codec.CompressBound(int64(len(data))))
+
+                       b.SetBytes(int64(len(data)))
+                       b.ReportAllocs()
+                       b.ResetTimer()
+
+                       for i := 0; i < b.N; i++ {
+                               enc := pool.Get().(*zstd.Encoder)
+                               enc.EncodeAll(data, dst[:0])
+                               enc.Reset(nil)
+                               pool.Put(enc)
+                       }
+               })
+       }
+}
diff --git a/parquet/compress/zstd.go b/parquet/compress/zstd.go
index bb60658e..674d84b8 100644
--- a/parquet/compress/zstd.go
+++ b/parquet/compress/zstd.go
@@ -64,7 +64,7 @@ func (p *zstdEncoderPool) getEncoderFromPool(level 
zstd.EncoderLevel) *zstd.Enco
                if !ok {
                        pool = &sync.Pool{
                                New: func() interface{} {
-                                       enc, _ := zstd.NewWriter(nil, 
zstd.WithZeroFrames(true), zstd.WithEncoderLevel(level))
+                                       enc, _ := zstd.NewWriter(nil, 
zstd.WithZeroFrames(true), zstd.WithEncoderLevel(level), 
zstd.WithEncoderConcurrency(1))
                                        return enc
                                },
                        }

Reply via email to