This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 7ae2e335 fix(compute): fix data race and memory leak in concurrent
is_in kernel (#712)
7ae2e335 is described below
commit 7ae2e33535c5181194c377f8a21629cf66b574ce
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Sun Mar 15 23:52:16 2026 +0100
fix(compute): fix data race and memory leak in concurrent is_in kernel
(#712)
by cleaning up SetLookupState via per-invocation ctx.State instead of
shared kernel.Data
fix: apache/iceberg-go/issues/489
### Rationale for this change
fix: apache/iceberg-go/issues/489 (flaky test in iceberg-go)
### What changes are included in this PR?
fix data race and memory leak in concurrent is_in kernel
### Are these changes tested?
yes, added a test to repro memory leak (3/10 runs fails)
### Are there any user-facing changes?
---
arrow/compute/executor.go | 7 ++++++-
arrow/compute/scalar_set_lookup.go | 1 -
arrow/compute/scalar_set_lookup_test.go | 35 +++++++++++++++++++++++++++++++++
3 files changed, 41 insertions(+), 2 deletions(-)
diff --git a/arrow/compute/executor.go b/arrow/compute/executor.go
index 59f1c4ee..095a7e15 100644
--- a/arrow/compute/executor.go
+++ b/arrow/compute/executor.go
@@ -581,7 +581,12 @@ func (s *scalarExecutor) WrapResults(ctx context.Context,
out <-chan Datum, hasC
func (s *scalarExecutor) executeSpans(data chan<- Datum) (err error) {
defer func() {
- err = errors.Join(err, s.kernel.Cleanup())
+ // Clean up per-invocation kernel state using ctx.State rather
than
+ // the shared kernel.Data field, which races when multiple
goroutines
+ // execute the same kernel concurrently.
+ if sk, ok := s.kernel.(*exec.ScalarKernel); ok && sk.CleanupFn
!= nil && s.ctx.State != nil {
+ err = errors.Join(err, sk.CleanupFn(s.ctx.State))
+ }
}()
var (
diff --git a/arrow/compute/scalar_set_lookup.go
b/arrow/compute/scalar_set_lookup.go
index 81971cea..84305536 100644
--- a/arrow/compute/scalar_set_lookup.go
+++ b/arrow/compute/scalar_set_lookup.go
@@ -143,7 +143,6 @@ type setLookupState interface {
func execIsIn(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult)
error {
state := ctx.State.(setLookupState)
- ctx.Kernel.(*exec.ScalarKernel).Data = state
in := batch.Values[0]
if !arrow.TypeEqual(in.Type(), state.ValueType()) {
diff --git a/arrow/compute/scalar_set_lookup_test.go
b/arrow/compute/scalar_set_lookup_test.go
index 770b984f..58a666bf 100644
--- a/arrow/compute/scalar_set_lookup_test.go
+++ b/arrow/compute/scalar_set_lookup_test.go
@@ -19,6 +19,7 @@ package compute_test
import (
"context"
"strings"
+ "sync"
"testing"
"github.com/apache/arrow-go/v18/arrow"
@@ -597,6 +598,40 @@ func (ss *ScalarSetLookupSuite) TestIsInChunked() {
ss.checkIsInChunked(input, valueSet, expected,
compute.NullMatchingInconclusive)
}
+// TestIsInConcurrentNoLeak verifies that concurrent is_in executions do not
+// leak BinaryMemoTable buffers. The is_in kernel stores per-invocation
+// SetLookupState in the shared kernel.Data field, which races when multiple
+// goroutines execute simultaneously — earlier states get overwritten and
+// their buffers (192 bytes each) leak. TearDownTest's AssertSize catches this.
+func (ss *ScalarSetLookupSuite) TestIsInConcurrentNoLeak() {
+ input := ss.getArr(arrow.BinaryTypes.String, `["alpha", "beta",
"gamma", "delta"]`)
+ defer input.Release()
+
+ valueSet := ss.getArr(arrow.BinaryTypes.String, `["alpha", "gamma"]`)
+ defer valueSet.Release()
+
+ const workers = 4
+ var wg sync.WaitGroup
+ wg.Add(workers)
+ for range workers {
+ go func() {
+ defer wg.Done()
+ defer func() {
+ recover() // race can cause panic in
BinaryMemoTable.lookup
+ }()
+
+ result, err := compute.IsIn(ss.ctx, compute.SetOptions{
+ ValueSet:
compute.NewDatumWithoutOwning(valueSet),
+ }, compute.NewDatumWithoutOwning(input))
+ if err != nil {
+ return
+ }
+ result.Release()
+ }()
+ }
+ wg.Wait()
+}
+
func (ss *ScalarSetLookupSuite) TearDownTest() {
ss.mem.AssertSize(ss.T(), 0)
}