Copilot commented on code in PR #1409:
URL: https://github.com/apache/pulsar-client-go/pull/1409#discussion_r2299631445


##########
pulsar/producer_test.go:
##########
@@ -449,6 +449,78 @@ func TestFlushInProducer(t *testing.T) {
        assert.Equal(t, msgCount, numOfMessages)
 }
 
+// TestConcurrentFlushInProducer validates that concurrent flushes don't 
create a deadlock
+func TestConcurrentFlushInProducer(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := "test-concurrent-flushes-in-producer"
+       subName := "subscription-name"
+       ctx := context.Background()
+
+       // set batch message number numOfMessages, and max delay 10s
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: subName,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       expectedMsgCount := 100
+
+       var wg sync.WaitGroup
+
+       wg.Add(expectedMsgCount)
+
+       errs := make(chan error, expectedMsgCount*2)
+
+       // Each message in sent and flushed concurrently
+       for range expectedMsgCount {

Review Comment:
   Using 'range expectedMsgCount' without an index variable is a Go 1.22+ 
feature. Consider using 'for i := 0; i < expectedMsgCount; i++' for better 
compatibility with older Go versions.
   ```suggestion
        for i := 0; i < expectedMsgCount; i++ {
   ```



##########
pulsar/producer_test.go:
##########
@@ -449,6 +449,78 @@ func TestFlushInProducer(t *testing.T) {
        assert.Equal(t, msgCount, numOfMessages)
 }
 
+// TestConcurrentFlushInProducer validates that concurrent flushes don't 
create a deadlock
+func TestConcurrentFlushInProducer(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := "test-concurrent-flushes-in-producer"
+       subName := "subscription-name"
+       ctx := context.Background()
+
+       // set batch message number numOfMessages, and max delay 10s
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: subName,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       expectedMsgCount := 100
+
+       var wg sync.WaitGroup
+
+       wg.Add(expectedMsgCount)
+
+       errs := make(chan error, expectedMsgCount*2)
+
+       // Each message in sent and flushed concurrently
+       for range expectedMsgCount {
+               go func() {
+                       defer wg.Done()
+                       producer.SendAsync(ctx, &ProducerMessage{
+                               Payload: []byte("anythingWorksInThatPayload"),
+                       }, func(_ MessageID, _ *ProducerMessage, e error) {
+                               errs <- e
+                       })
+
+                       errs <- producer.FlushWithCtx(ctx)
+               }()
+       }
+
+       // Wait for all concurrent async publications and flushes to complete
+       wg.Wait()
+
+       // Make sure that there were no error publishing or flushing
+       close(errs)
+       var errElementCount int
+       for e := range errs {
+               errElementCount++
+               assert.Nil(t, e)
+       }
+       assert.Equal(t, errElementCount, expectedMsgCount*2)
+
+       // Make sure all messages were processed successfully
+       var receivedMsgCount int
+       for range expectedMsgCount {

Review Comment:
   Using 'range expectedMsgCount' without an index variable is a Go 1.22+ 
feature. Consider using 'for i := 0; i < expectedMsgCount; i++' for better 
compatibility with older Go versions.
   ```suggestion
        for i := 0; i < expectedMsgCount; i++ {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to