This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 5a944224 perf(parquet/compress): set zstd pool encoder concurrency to
1 (#717)
5a944224 is described below
commit 5a944224b27705e968a05062bea93b85da1e2e2f
Author: Dima Kuznetsov <[email protected]>
AuthorDate: Tue Mar 17 19:02:31 2026 +0200
perf(parquet/compress): set zstd pool encoder concurrency to 1 (#717)
The zstdEncoderPool is used exclusively by EncodeAll(), which is a
single-shot synchronous call that uses exactly one inner block encoder.
However, zstd.NewWriter defaults concurrent to runtime.GOMAXPROCS,
pre-allocating that many inner block encoders — each with its own ~1 MiB
history buffer (ensureHist). On a 10-core machine, each pooled Encoder
allocates 10 inner encoders when only 1 is ever used by EncodeAll.
With WithEncoderConcurrency(1), each pooled encoder creates a single
inner encoder, matching actual usage. The streaming Write/Close path is
unaffected — it does not use the pool.
Benchmark results (Apple M4 Pro, arm64, 256 KiB semi-random data):
BenchmarkZstdPooledEncodeAll/Default-14 11000 B/op 5250 MB/s
BenchmarkZstdPooledEncodeAll/Concurrency1-14 810 B/op 5500 MB/s
14x less memory per operation, ~5% higher throughput from reduced GC
pressure.
In a parquet write workload (1 GiB Arrow data, ZSTD level 3), this
reduced ensureHist allocations from 22 GiB to 7 GiB and madvise kernel
CPU from 4.6s to 2.3s (10% wall-time improvement).
### Rationale for this change
High memory churn during parquet encoding
### What changes are included in this PR?
Change to zstd encoder concurrency, a benchmark to reproduce results.
### Are these changes tested?
Yes
### Are there any user-facing changes?
No
---
parquet/compress/compress_test.go | 68 +++++++++++++++++++++++++++++++++++++++
parquet/compress/zstd.go | 2 +-
2 files changed, 69 insertions(+), 1 deletion(-)
diff --git a/parquet/compress/compress_test.go
b/parquet/compress/compress_test.go
index 00f13760..c60c65d7 100644
--- a/parquet/compress/compress_test.go
+++ b/parquet/compress/compress_test.go
@@ -20,9 +20,11 @@ import (
"bytes"
"io"
"math/rand"
+ "sync"
"testing"
"github.com/apache/arrow-go/v18/parquet/compress"
+ "github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/assert"
)
@@ -179,3 +181,69 @@ func TestUnmarshalTextError(t *testing.T) {
err := compression.UnmarshalText([]byte("NO SUCH CODEC"))
assert.EqualError(t, err, "not a valid CompressionCodec string")
}
+
+// BenchmarkZstdPooledEncodeAll compares zstd EncodeAll throughput and
allocation
+// overhead for pooled encoders created with the default concurrency
(GOMAXPROCS
+// inner block encoders) vs concurrency=1 (single inner block encoder).
+//
+// Each inner block encoder carries a ~1 MiB history buffer allocated on first
+// use (ensureHist). With the default, a pooled encoder pre-allocates
GOMAXPROCS
+// of these buffers even though EncodeAll only ever uses one at a time. Setting
+// concurrency=1 eliminates the wasted allocations.
+//
+// The benchmark uses semi-random data (seeded random blocks mixed with
repeated
+// patterns) to exercise the encoder's history window realistically — matching
+// typical parquet page payloads that contain a mix of unique and repeated
values.
+func BenchmarkZstdPooledEncodeAll(b *testing.B) {
+ // 256 KiB of semi-random data — typical parquet page size.
+ // Mix random and repeated segments so the encoder exercises its full
+ // match-finding and history-window code paths.
+ data := make([]byte, 256*1024)
+ r := rand.New(rand.NewSource(42))
+ r.Read(data)
+ // Overwrite ~25% with repeated pattern to give the encoder something
to match.
+ pattern := []byte("parquet-page-data-pattern-0123456789abcdef")
+ for i := 0; i < len(data)/4; i += len(pattern) {
+ copy(data[i:], pattern)
+ }
+
+ for _, tc := range []struct {
+ name string
+ concurrency int
+ }{
+ {"Default", 0}, // GOMAXPROCS inner encoders
+ {"Concurrency1", 1}, // single inner encoder
+ } {
+ b.Run(tc.name, func(b *testing.B) {
+ pool := &sync.Pool{
+ New: func() interface{} {
+ opts := []zstd.EOption{
+ zstd.WithZeroFrames(true),
+
zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(3)),
+ }
+ if tc.concurrency > 0 {
+ opts = append(opts,
zstd.WithEncoderConcurrency(tc.concurrency))
+ }
+ enc, _ := zstd.NewWriter(nil, opts...)
+ return enc
+ },
+ }
+ codec, err := compress.GetCodec(compress.Codecs.Zstd)
+ if err != nil {
+ b.Fatal(err)
+ }
+ dst := make([]byte,
codec.CompressBound(int64(len(data))))
+
+ b.SetBytes(int64(len(data)))
+ b.ReportAllocs()
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ enc := pool.Get().(*zstd.Encoder)
+ enc.EncodeAll(data, dst[:0])
+ enc.Reset(nil)
+ pool.Put(enc)
+ }
+ })
+ }
+}
diff --git a/parquet/compress/zstd.go b/parquet/compress/zstd.go
index bb60658e..674d84b8 100644
--- a/parquet/compress/zstd.go
+++ b/parquet/compress/zstd.go
@@ -64,7 +64,7 @@ func (p *zstdEncoderPool) getEncoderFromPool(level
zstd.EncoderLevel) *zstd.Enco
if !ok {
pool = &sync.Pool{
New: func() interface{} {
- enc, _ := zstd.NewWriter(nil,
zstd.WithZeroFrames(true), zstd.WithEncoderLevel(level))
+ enc, _ := zstd.NewWriter(nil,
zstd.WithZeroFrames(true), zstd.WithEncoderLevel(level),
zstd.WithEncoderConcurrency(1))
return enc
},
}