This is an automated email from the ASF dual-hosted git repository.
crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 7a9a33c1 [issue #1357][producer] fix: allow multiples callbacks with
concurrent producer flushes (async publish) (#1409)
7a9a33c1 is described below
commit 7a9a33c178fb4bac6488469865a2ac07c466b554
Author: Thomas Bousquet <[email protected]>
AuthorDate: Tue Aug 26 19:01:58 2025 -0700
[issue #1357][producer] fix: allow multiples callbacks with concurrent
producer flushes (async publish) (#1409)
---
.github/workflows/ci.yml | 8 +---
.gitignore | 1 +
pulsar/consumer_test.go | 6 +--
pulsar/producer_partition.go | 26 +++++------
pulsar/producer_test.go | 72 ++++++++++++++++++++++++++++++
pulsaradmin/pkg/admin/subscription_test.go | 2 +-
6 files changed, 91 insertions(+), 24 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 510f1821..b7aee020 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -50,13 +50,7 @@ jobs:
matrix:
go-version: [ '1.23', '1.24' ]
steps:
- - uses: actions/checkout@v3
- - name: Check for Docker images
- id: check_images
- run: echo "::set-output name=images::$(docker images -q | wc -l)"
- - name: Clean Docker cache if images exist
- if: ${{ steps.check_images.outputs.images > 0 }}
- run: docker rmi $(docker images -q) -f && df -h
+ - uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}
diff --git a/.gitignore b/.gitignore
index bfa04f03..91999bb1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,3 +16,4 @@ pulsar-perf
bin
vendor/
+logs/
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 80ce0d2e..8a4ea3f3 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -5230,7 +5230,7 @@ func sendMessages(t *testing.T, client Client, topic
string, startIndex int, num
}
}
}
- assert.Nil(t, producer.Flush())
+ assert.Nil(t, producer.FlushWithCtx(ctx))
}
func receiveMessages(t *testing.T, consumer Consumer, numMessages int)
[]Message {
@@ -5276,10 +5276,10 @@ func TestAckResponseNotBlocked(t *testing.T) {
}
})
if i%100 == 99 {
- assert.Nil(t, producer.Flush())
+ assert.Nil(t, producer.FlushWithCtx(ctx))
}
}
- producer.Flush()
+ producer.FlushWithCtx(ctx)
producer.Close()
// Set a small receiver queue size to trigger ack response blocking if
the internal `queueCh`
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 1d65ec87..9841222f 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -839,15 +839,15 @@ func (p *partitionProducer) internalSingleSend(
type pendingItem struct {
sync.Mutex
- ctx context.Context
- cancel context.CancelFunc
- buffer internal.Buffer
- sequenceID uint64
- createdAt time.Time
- sentAt time.Time
- sendRequests []interface{}
- isDone bool
- flushCallback func(err error)
+ ctx context.Context
+ cancel context.CancelFunc
+ buffer internal.Buffer
+ sequenceID uint64
+ createdAt time.Time
+ sentAt time.Time
+ sendRequests []interface{}
+ isDone bool
+ flushCallbacks []func(err error)
}
func (p *partitionProducer) internalFlushCurrentBatch() {
@@ -1064,10 +1064,10 @@ func (p *partitionProducer) internalFlush(fr
*flushRequest) {
return
}
- pi.flushCallback = func(err error) {
+ pi.flushCallbacks = append(pi.flushCallbacks, func(err error) {
fr.err = err
close(fr.doneCh)
- }
+ })
}
// clearPendingSendRequests makes sure to push forward previous sending
requests
@@ -1749,8 +1749,8 @@ func (i *pendingItem) done(err error) {
i.isDone = true
// return the buffer to the pool after all callbacks have been called.
defer i.buffer.Release()
- if i.flushCallback != nil {
- i.flushCallback(err)
+ for _, callback := range i.flushCallbacks {
+ callback(err)
}
if i.cancel != nil {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 9a3b99f5..02b4b677 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -449,6 +449,78 @@ func TestFlushInProducer(t *testing.T) {
assert.Equal(t, msgCount, numOfMessages)
}
+// TestConcurrentFlushInProducer validates that concurrent flushes don't
create a deadlock
+func TestConcurrentFlushInProducer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := "test-concurrent-flushes-in-producer"
+ subName := "subscription-name"
+ ctx := context.Background()
+
+ // set batch message number numOfMessages, and max delay 10s
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: subName,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ expectedMsgCount := 100
+
+ var wg sync.WaitGroup
+
+ wg.Add(expectedMsgCount)
+
+ errs := make(chan error, expectedMsgCount*2)
+
+ // Each message in sent and flushed concurrently
+ for range expectedMsgCount {
+ go func() {
+ defer wg.Done()
+ producer.SendAsync(ctx, &ProducerMessage{
+ Payload: []byte("anythingWorksInThatPayload"),
+ }, func(_ MessageID, _ *ProducerMessage, e error) {
+ errs <- e
+ })
+
+ errs <- producer.FlushWithCtx(ctx)
+ }()
+ }
+
+ // Wait for all concurrent async publications and flushes to complete
+ wg.Wait()
+
+ // Make sure that there were no error publishing or flushing
+ close(errs)
+ var errElementCount int
+ for e := range errs {
+ errElementCount++
+ assert.Nil(t, e)
+ }
+ assert.Equal(t, errElementCount, expectedMsgCount*2)
+
+ // Make sure all messages were processed successfully
+ var receivedMsgCount int
+ for range expectedMsgCount {
+ _, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ receivedMsgCount++
+ }
+
+ assert.Equal(t, receivedMsgCount, expectedMsgCount)
+}
+
func TestFlushInPartitionedProducer(t *testing.T) {
topicName := "public/default/partition-testFlushInPartitionedProducer"
diff --git a/pulsaradmin/pkg/admin/subscription_test.go
b/pulsaradmin/pkg/admin/subscription_test.go
index 087a4a37..1e4a7b21 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -126,7 +126,7 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) {
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}, nil)
}
- err = producer.Flush()
+ err = producer.FlushWithCtx(ctx)
if err != nil {
return
}