This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ae2e335 fix(compute): fix data race and memory leak in concurrent 
is_in kernel (#712)
7ae2e335 is described below

commit 7ae2e33535c5181194c377f8a21629cf66b574ce
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Sun Mar 15 23:52:16 2026 +0100

    fix(compute): fix data race and memory leak in concurrent is_in kernel 
(#712)
    
    by cleaning up SetLookupState via per-invocation ctx.State instead of
    shared kernel.Data
    
    fix: apache/iceberg-go/issues/489
    
    ### Rationale for this change
    
    fix: apache/iceberg-go/issues/489 (flaky test in iceberg-go)
    
    ### What changes are included in this PR?
    
    fix data race and memory leak in concurrent is_in kernel
    
    ### Are these changes tested?
    
    yes, added a test to repro memory leak (3/10 runs fails)
    
    ### Are there any user-facing changes?
---
 arrow/compute/executor.go               |  7 ++++++-
 arrow/compute/scalar_set_lookup.go      |  1 -
 arrow/compute/scalar_set_lookup_test.go | 35 +++++++++++++++++++++++++++++++++
 3 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/arrow/compute/executor.go b/arrow/compute/executor.go
index 59f1c4ee..095a7e15 100644
--- a/arrow/compute/executor.go
+++ b/arrow/compute/executor.go
@@ -581,7 +581,12 @@ func (s *scalarExecutor) WrapResults(ctx context.Context, 
out <-chan Datum, hasC
 
 func (s *scalarExecutor) executeSpans(data chan<- Datum) (err error) {
        defer func() {
-               err = errors.Join(err, s.kernel.Cleanup())
+               // Clean up per-invocation kernel state using ctx.State rather 
than
+               // the shared kernel.Data field, which races when multiple 
goroutines
+               // execute the same kernel concurrently.
+               if sk, ok := s.kernel.(*exec.ScalarKernel); ok && sk.CleanupFn 
!= nil && s.ctx.State != nil {
+                       err = errors.Join(err, sk.CleanupFn(s.ctx.State))
+               }
        }()
 
        var (
diff --git a/arrow/compute/scalar_set_lookup.go 
b/arrow/compute/scalar_set_lookup.go
index 81971cea..84305536 100644
--- a/arrow/compute/scalar_set_lookup.go
+++ b/arrow/compute/scalar_set_lookup.go
@@ -143,7 +143,6 @@ type setLookupState interface {
 
 func execIsIn(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) 
error {
        state := ctx.State.(setLookupState)
-       ctx.Kernel.(*exec.ScalarKernel).Data = state
        in := batch.Values[0]
 
        if !arrow.TypeEqual(in.Type(), state.ValueType()) {
diff --git a/arrow/compute/scalar_set_lookup_test.go 
b/arrow/compute/scalar_set_lookup_test.go
index 770b984f..58a666bf 100644
--- a/arrow/compute/scalar_set_lookup_test.go
+++ b/arrow/compute/scalar_set_lookup_test.go
@@ -19,6 +19,7 @@ package compute_test
 import (
        "context"
        "strings"
+       "sync"
        "testing"
 
        "github.com/apache/arrow-go/v18/arrow"
@@ -597,6 +598,40 @@ func (ss *ScalarSetLookupSuite) TestIsInChunked() {
        ss.checkIsInChunked(input, valueSet, expected, 
compute.NullMatchingInconclusive)
 }
 
+// TestIsInConcurrentNoLeak verifies that concurrent is_in executions do not
+// leak BinaryMemoTable buffers. The is_in kernel stores per-invocation
+// SetLookupState in the shared kernel.Data field, which races when multiple
+// goroutines execute simultaneously — earlier states get overwritten and
+// their buffers (192 bytes each) leak. TearDownTest's AssertSize catches this.
+func (ss *ScalarSetLookupSuite) TestIsInConcurrentNoLeak() {
+       input := ss.getArr(arrow.BinaryTypes.String, `["alpha", "beta", 
"gamma", "delta"]`)
+       defer input.Release()
+
+       valueSet := ss.getArr(arrow.BinaryTypes.String, `["alpha", "gamma"]`)
+       defer valueSet.Release()
+
+       const workers = 4
+       var wg sync.WaitGroup
+       wg.Add(workers)
+       for range workers {
+               go func() {
+                       defer wg.Done()
+                       defer func() {
+                               recover() // race can cause panic in 
BinaryMemoTable.lookup
+                       }()
+
+                       result, err := compute.IsIn(ss.ctx, compute.SetOptions{
+                               ValueSet: 
compute.NewDatumWithoutOwning(valueSet),
+                       }, compute.NewDatumWithoutOwning(input))
+                       if err != nil {
+                               return
+                       }
+                       result.Release()
+               }()
+       }
+       wg.Wait()
+}
+
 func (ss *ScalarSetLookupSuite) TearDownTest() {
        ss.mem.AssertSize(ss.T(), 0)
 }

Reply via email to