This is an automated email from the ASF dual-hosted git repository.

crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a9a33c1 [issue #1357][producer] fix: allow multiples callbacks with 
concurrent producer flushes (async publish) (#1409)
7a9a33c1 is described below

commit 7a9a33c178fb4bac6488469865a2ac07c466b554
Author: Thomas Bousquet <[email protected]>
AuthorDate: Tue Aug 26 19:01:58 2025 -0700

    [issue #1357][producer] fix: allow multiples callbacks with concurrent 
producer flushes (async publish) (#1409)
---
 .github/workflows/ci.yml                   |  8 +---
 .gitignore                                 |  1 +
 pulsar/consumer_test.go                    |  6 +--
 pulsar/producer_partition.go               | 26 +++++------
 pulsar/producer_test.go                    | 72 ++++++++++++++++++++++++++++++
 pulsaradmin/pkg/admin/subscription_test.go |  2 +-
 6 files changed, 91 insertions(+), 24 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 510f1821..b7aee020 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -50,13 +50,7 @@ jobs:
       matrix:
         go-version: [ '1.23', '1.24' ]
     steps:
-      - uses: actions/checkout@v3
-      - name: Check for Docker images
-        id: check_images
-        run: echo "::set-output name=images::$(docker images -q | wc -l)"
-      - name: Clean Docker cache if images exist
-        if: ${{ steps.check_images.outputs.images > 0 }}
-        run: docker rmi $(docker images -q) -f && df -h        
+      - uses: actions/checkout@v3      
       - uses: actions/setup-go@v3
         with:
           go-version: ${{ matrix.go-version }}
diff --git a/.gitignore b/.gitignore
index bfa04f03..91999bb1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,3 +16,4 @@ pulsar-perf
 bin
 
 vendor/
+logs/
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 80ce0d2e..8a4ea3f3 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -5230,7 +5230,7 @@ func sendMessages(t *testing.T, client Client, topic 
string, startIndex int, num
                        }
                }
        }
-       assert.Nil(t, producer.Flush())
+       assert.Nil(t, producer.FlushWithCtx(ctx))
 }
 
 func receiveMessages(t *testing.T, consumer Consumer, numMessages int) 
[]Message {
@@ -5276,10 +5276,10 @@ func TestAckResponseNotBlocked(t *testing.T) {
                        }
                })
                if i%100 == 99 {
-                       assert.Nil(t, producer.Flush())
+                       assert.Nil(t, producer.FlushWithCtx(ctx))
                }
        }
-       producer.Flush()
+       producer.FlushWithCtx(ctx)
        producer.Close()
 
        // Set a small receiver queue size to trigger ack response blocking if 
the internal `queueCh`
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 1d65ec87..9841222f 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -839,15 +839,15 @@ func (p *partitionProducer) internalSingleSend(
 
 type pendingItem struct {
        sync.Mutex
-       ctx           context.Context
-       cancel        context.CancelFunc
-       buffer        internal.Buffer
-       sequenceID    uint64
-       createdAt     time.Time
-       sentAt        time.Time
-       sendRequests  []interface{}
-       isDone        bool
-       flushCallback func(err error)
+       ctx            context.Context
+       cancel         context.CancelFunc
+       buffer         internal.Buffer
+       sequenceID     uint64
+       createdAt      time.Time
+       sentAt         time.Time
+       sendRequests   []interface{}
+       isDone         bool
+       flushCallbacks []func(err error)
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
@@ -1064,10 +1064,10 @@ func (p *partitionProducer) internalFlush(fr 
*flushRequest) {
                return
        }
 
-       pi.flushCallback = func(err error) {
+       pi.flushCallbacks = append(pi.flushCallbacks, func(err error) {
                fr.err = err
                close(fr.doneCh)
-       }
+       })
 }
 
 // clearPendingSendRequests makes sure to push forward previous sending 
requests
@@ -1749,8 +1749,8 @@ func (i *pendingItem) done(err error) {
        i.isDone = true
        // return the buffer to the pool after all callbacks have been called.
        defer i.buffer.Release()
-       if i.flushCallback != nil {
-               i.flushCallback(err)
+       for _, callback := range i.flushCallbacks {
+               callback(err)
        }
 
        if i.cancel != nil {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 9a3b99f5..02b4b677 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -449,6 +449,78 @@ func TestFlushInProducer(t *testing.T) {
        assert.Equal(t, msgCount, numOfMessages)
 }
 
+// TestConcurrentFlushInProducer validates that concurrent flushes don't 
create a deadlock
+func TestConcurrentFlushInProducer(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := "test-concurrent-flushes-in-producer"
+       subName := "subscription-name"
+       ctx := context.Background()
+
+       // set batch message number numOfMessages, and max delay 10s
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: subName,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       expectedMsgCount := 100
+
+       var wg sync.WaitGroup
+
+       wg.Add(expectedMsgCount)
+
+       errs := make(chan error, expectedMsgCount*2)
+
+       // Each message in sent and flushed concurrently
+       for range expectedMsgCount {
+               go func() {
+                       defer wg.Done()
+                       producer.SendAsync(ctx, &ProducerMessage{
+                               Payload: []byte("anythingWorksInThatPayload"),
+                       }, func(_ MessageID, _ *ProducerMessage, e error) {
+                               errs <- e
+                       })
+
+                       errs <- producer.FlushWithCtx(ctx)
+               }()
+       }
+
+       // Wait for all concurrent async publications and flushes to complete
+       wg.Wait()
+
+       // Make sure that there were no error publishing or flushing
+       close(errs)
+       var errElementCount int
+       for e := range errs {
+               errElementCount++
+               assert.Nil(t, e)
+       }
+       assert.Equal(t, errElementCount, expectedMsgCount*2)
+
+       // Make sure all messages were processed successfully
+       var receivedMsgCount int
+       for range expectedMsgCount {
+               _, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               receivedMsgCount++
+       }
+
+       assert.Equal(t, receivedMsgCount, expectedMsgCount)
+}
+
 func TestFlushInPartitionedProducer(t *testing.T) {
        topicName := "public/default/partition-testFlushInPartitionedProducer"
 
diff --git a/pulsaradmin/pkg/admin/subscription_test.go 
b/pulsaradmin/pkg/admin/subscription_test.go
index 087a4a37..1e4a7b21 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -126,7 +126,7 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) {
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                }, nil)
        }
-       err = producer.Flush()
+       err = producer.FlushWithCtx(ctx)
        if err != nil {
                return
        }

Reply via email to