Copilot commented on code in PR #1422:
URL: https://github.com/apache/pulsar-client-go/pull/1422#discussion_r2342923681
##########
pulsar/producer_test.go:
##########
@@ -2741,3 +2789,112 @@ func TestSendBufferRetainWhenConnectionStuck(t
*testing.T) {
b := conn.buffers[0]
assert.Equal(t, int64(1), b.RefCnt(), "Expected buffer to have a
reference count of 1 after sending")
}
+
+func TestSendAsyncCouldTimeoutWhileReconnecting(t *testing.T) {
+ testSendAsyncCouldTimeoutWhileReconnecting(t, false)
+ testSendAsyncCouldTimeoutWhileReconnecting(t, true)
+}
+
+func testSendAsyncCouldTimeoutWhileReconnecting(t *testing.T,
isDisableBatching bool) {
+ t.Helper()
+
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err, "Failed to start the pulsar container")
+ defer func() {
+ err := c.Terminate(context.Background())
+ if err != nil {
+ t.Fatal("Failed to terminate the pulsar container", err)
+ }
+ }()
+
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+ client, err := NewClient(ClientOptions{
+ URL: endpoint,
+ ConnectionTimeout: 5 * time.Second,
+ OperationTimeout: 5 * time.Second,
+ })
+ require.NoError(t, err)
+ defer client.Close()
+
+ var testProducer Producer
+ require.Eventually(t, func() bool {
+ testProducer, err = client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ Schema: NewBytesSchema(nil),
+ SendTimeout: 3 * time.Second,
+ DisableBatching: isDisableBatching,
+ BatchingMaxMessages: 5,
+ MaxPendingMessages: 10,
+ })
+ return err == nil
+ }, 30*time.Second, 1*time.Second)
+
+ numMessages := 10
+ // Send 10 messages synchronously
+ for i := 0; i < numMessages; i++ {
+ send, err := testProducer.Send(context.Background(),
&ProducerMessage{Payload: []byte("test")})
+ require.NoError(t, err)
+ require.NotNil(t, send)
+ }
+
+ // stop pulsar server
+ timeout := 10 * time.Second
+ err = c.Stop(context.Background(), &timeout)
+ require.NoError(t, err)
+
+ // Test the SendAsync could be timeout if the producer is reconnecting
+
+ finalErr := make(chan error, 1)
+ testProducer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: []byte("test"),
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
+ finalErr <- err
+ })
+ select {
+ case <-time.After(10 * time.Second):
+ t.Fatal("test timeout")
+ case err = <-finalErr:
+ // should get a timeout error
+ require.ErrorIs(t, err, ErrSendTimeout)
+ }
+ close(finalErr)
Review Comment:
[nitpick] Closing the same channel variable name 'finalErr' twice in the
same test function could be confusing. Consider using different variable names
for the two test scenarios to improve code clarity.
##########
pulsar/producer_partition.go:
##########
@@ -385,31 +391,20 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL
string) error {
"epoch": atomic.LoadUint64(&p.epoch),
}).Info("Connected producer")
- pendingItems := p.pendingQueue.ReadableSlice()
- viewSize := len(pendingItems)
- if viewSize > 0 {
- p.log.Infof("Resending %d pending batches", viewSize)
- lastViewItem := pendingItems[viewSize-1].(*pendingItem)
-
- // iterate at most pending items
- for i := 0; i < viewSize; i++ {
- item := p.pendingQueue.Poll()
- if item == nil {
- continue
- }
- pi := item.(*pendingItem)
- // when resending pending batches, we update the sendAt
timestamp to record the metric.
- pi.Lock()
- pi.sentAt = time.Now()
- pi.Unlock()
- pi.buffer.Retain() // Retain for writing to the
connection
- p.pendingQueue.Put(pi)
- p._getConn().WriteData(pi.ctx, pi.buffer)
+ p.pendingQueue.Lock()
+ defer p.pendingQueue.Unlock()
+ p.pendingQueue.IterateUnsafe(func(item any) {
+ pi := item.(*pendingItem)
+ // when resending pending batches, we update the sendAt
timestamp to record the metric.
+ pi.Lock()
+ pi.sentAt = time.Now()
+ pi.Unlock()
+ pi.buffer.Retain() // Retain for writing to the connection
+ p._getConn().WriteData(pi.ctx, pi.buffer)
+ })
- if pi == lastViewItem {
- break
- }
- }
+ if !p.casProducerState(producerConnecting, producerReady) &&
p.isClosingOrClosed() {
Review Comment:
The logic combines two conditions with AND, but should return
ErrProducerClosed if EITHER the CAS fails OR the producer is closing/closed.
The current logic only returns the error when both conditions are true, which
may not handle all error cases correctly.
```suggestion
if !p.casProducerState(producerConnecting, producerReady) ||
p.isClosingOrClosed() {
```
##########
pulsar/producer_test.go:
##########
@@ -2741,3 +2789,112 @@ func TestSendBufferRetainWhenConnectionStuck(t
*testing.T) {
b := conn.buffers[0]
assert.Equal(t, int64(1), b.RefCnt(), "Expected buffer to have a
reference count of 1 after sending")
}
+
+func TestSendAsyncCouldTimeoutWhileReconnecting(t *testing.T) {
+ testSendAsyncCouldTimeoutWhileReconnecting(t, false)
+ testSendAsyncCouldTimeoutWhileReconnecting(t, true)
+}
+
+func testSendAsyncCouldTimeoutWhileReconnecting(t *testing.T,
isDisableBatching bool) {
+ t.Helper()
+
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err, "Failed to start the pulsar container")
+ defer func() {
+ err := c.Terminate(context.Background())
+ if err != nil {
+ t.Fatal("Failed to terminate the pulsar container", err)
+ }
+ }()
+
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+ client, err := NewClient(ClientOptions{
+ URL: endpoint,
+ ConnectionTimeout: 5 * time.Second,
+ OperationTimeout: 5 * time.Second,
+ })
+ require.NoError(t, err)
+ defer client.Close()
+
+ var testProducer Producer
+ require.Eventually(t, func() bool {
+ testProducer, err = client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ Schema: NewBytesSchema(nil),
+ SendTimeout: 3 * time.Second,
+ DisableBatching: isDisableBatching,
+ BatchingMaxMessages: 5,
+ MaxPendingMessages: 10,
+ })
+ return err == nil
+ }, 30*time.Second, 1*time.Second)
+
+ numMessages := 10
+ // Send 10 messages synchronously
+ for i := 0; i < numMessages; i++ {
+ send, err := testProducer.Send(context.Background(),
&ProducerMessage{Payload: []byte("test")})
+ require.NoError(t, err)
+ require.NotNil(t, send)
+ }
+
+ // stop pulsar server
+ timeout := 10 * time.Second
+ err = c.Stop(context.Background(), &timeout)
+ require.NoError(t, err)
+
+ // Test the SendAsync could be timeout if the producer is reconnecting
+
+ finalErr := make(chan error, 1)
+ testProducer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: []byte("test"),
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
+ finalErr <- err
+ })
+ select {
+ case <-time.After(10 * time.Second):
+ t.Fatal("test timeout")
+ case err = <-finalErr:
+ // should get a timeout error
+ require.ErrorIs(t, err, ErrSendTimeout)
+ }
+ close(finalErr)
+
+ // Test that the SendAsync could be timeout if the pending queue is full
+
+ go func() {
+ // Send 10 messages asynchronously to make the pending queue
full
+ for i := 0; i < numMessages; i++ {
+ testProducer.SendAsync(context.Background(),
&ProducerMessage{
+ Payload: []byte("test"),
+ }, func(_ MessageID, _ *ProducerMessage, _ error) {
+ })
+ }
+ }()
+
+ time.Sleep(3 * time.Second)
+ finalErr = make(chan error, 1)
+ testProducer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: []byte("test"),
+ }, func(_ MessageID, _ *ProducerMessage, err error) {
+ finalErr <- err
+ })
+ select {
+ case <-time.After(10 * time.Second):
+ t.Fatal("test timeout")
+ case err = <-finalErr:
+ // should get a timeout error
+ require.ErrorIs(t, err, ErrSendTimeout)
+ }
+ close(finalErr)
Review Comment:
[nitpick] Closing the same channel variable name 'finalErr' twice in the
same test function could be confusing. Consider using different variable names
for the two test scenarios to improve code clarity.
```suggestion
finalErr2 := make(chan error, 1)
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
finalErr2 <- err
})
select {
case <-time.After(10 * time.Second):
t.Fatal("test timeout")
case err = <-finalErr2:
// should get a timeout error
require.ErrorIs(t, err, ErrSendTimeout)
}
close(finalErr2)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]