This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ba6468a5 fix: handle reconnection logic for zero queue consumer (#1404)
ba6468a5 is described below

commit ba6468a5edf36644ccee72a19cdfc8cec6dac40e
Author: crossoverJie <[email protected]>
AuthorDate: Fri Aug 29 20:52:57 2025 +0800

    fix: handle reconnection logic for zero queue consumer (#1404)
---
 pulsar/consumer_impl.go            |   1 +
 pulsar/consumer_partition.go       |  13 ++--
 pulsar/consumer_zero_queue_test.go | 122 +++++++++++++++++++++++++++++++++++++
 3 files changed, 132 insertions(+), 4 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 5f6dac3d..04cc9501 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -473,6 +473,7 @@ func newPartitionConsumerOpts(topic, consumerName string, 
idx int, options Consu
                enableBatchIndexAck:         
options.EnableBatchIndexAcknowledgment,
                ackGroupingOptions:          options.AckGroupingOptions,
                autoReceiverQueueSize:       
options.EnableAutoScaledReceiverQueueSize,
+               enableZeroQueueConsumer:     options.EnableZeroQueueConsumer,
        }
 }
 
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 56e90a13..fcde4f3a 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -123,9 +123,10 @@ type partitionConsumerOpts struct {
        expireTimeOfIncompleteChunk time.Duration
        autoAckIncompleteChunk      bool
        // in failover mode, this callback will be called when consumer change
-       consumerEventListener ConsumerEventListener
-       enableBatchIndexAck   bool
-       ackGroupingOptions    *AckGroupingOptions
+       consumerEventListener   ConsumerEventListener
+       enableBatchIndexAck     bool
+       ackGroupingOptions      *AckGroupingOptions
+       enableZeroQueueConsumer bool
 }
 
 type ConsumerEventListener interface {
@@ -1665,7 +1666,7 @@ func (pc *partitionConsumer) dispatcher() {
 
                        pc.log.Debugf("dispatcher requesting initial 
permits=%d", initialPermits)
                        // send initial permits
-                       if err := pc.internalFlow(initialPermits); err != nil {
+                       if err := pc.internalFlow(initialPermits); err != nil 
&& !pc.options.enableZeroQueueConsumer {
                                pc.log.WithError(err).Error("unable to send 
initial permits to broker")
                        }
 
@@ -1922,6 +1923,10 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                        // Successfully reconnected
                        pc.log.Info("Reconnected consumer to broker")
                        bo.Reset()
+                       if pc.options.enableZeroQueueConsumer {
+                               pc.log.Info("zeroQueueConsumer reconnect, reset 
availablePermits")
+                               pc.availablePermits.inc()
+                       }
                        return struct{}{}, nil
                }
                pc.log.WithError(err).Error("Failed to create consumer at 
reconnect")
diff --git a/pulsar/consumer_zero_queue_test.go 
b/pulsar/consumer_zero_queue_test.go
index 1538a3c4..06db433b 100644
--- a/pulsar/consumer_zero_queue_test.go
+++ b/pulsar/consumer_zero_queue_test.go
@@ -24,6 +24,12 @@ import (
        "testing"
        "time"
 
+       "github.com/docker/docker/api/types/container"
+       "github.com/docker/go-connections/nat"
+       "github.com/stretchr/testify/require"
+       "github.com/testcontainers/testcontainers-go"
+       "github.com/testcontainers/testcontainers-go/wait"
+
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsaradmin"
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
@@ -115,6 +121,122 @@ func TestNormalZeroQueueConsumer(t *testing.T) {
        err = consumer.Unsubscribe()
        assert.Nil(t, err)
 }
+func TestReconnectConsumer(t *testing.T) {
+
+       req := testcontainers.ContainerRequest{
+               Name:         "pulsar-test",
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               HostConfigModifier: func(config *container.HostConfig) {
+                       config.PortBindings = map[nat.Port][]nat.PortBinding{
+                               "6650/tcp": {{HostIP: "0.0.0.0", HostPort: 
"6659"}},
+                               "8080/tcp": {{HostIP: "0.0.0.0", HostPort: 
"8089"}},
+                       }
+               },
+               Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+               Reuse:            true,
+       })
+       require.NoError(t, err, "Failed to start the pulsar container")
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+       client, err := NewClient(ClientOptions{
+               URL: endpoint,
+       })
+       assert.Nil(t, err)
+       adminEndpoint, err := c.PortEndpoint(context.Background(), "8080", 
"http")
+       assert.Nil(t, err)
+       admin, err := pulsaradmin.NewClient(&config.Config{
+               WebServiceURL: adminEndpoint,
+       })
+       assert.Nil(t, err)
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+       var consumer Consumer
+       require.Eventually(t, func() bool {
+               consumer, err = client.Subscribe(ConsumerOptions{
+                       Topic:                   topic,
+                       SubscriptionName:        "my-sub",
+                       EnableZeroQueueConsumer: true,
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+
+       assert.Nil(t, err)
+       _, ok := consumer.(*zeroQueueConsumer)
+       assert.True(t, ok)
+       defer consumer.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+       })
+       assert.Nil(t, err)
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "pulsar",
+                       Properties: map[string]string{
+                               "key-1": "pulsar-1",
+                       },
+               })
+               assert.Nil(t, err)
+               log.Printf("send message: %s", msg.String())
+       }
+
+       ch := make(chan struct{})
+
+       go func() {
+               time.Sleep(3 * time.Second)
+               log.Println("unloading topic")
+               // Simulate a broker restart by stopping the pulsar container
+               topicName, err := utils.GetTopicName(topic)
+               assert.Nil(t, err)
+               err = admin.Topics().Unload(*topicName)
+               assert.Nil(t, err)
+               log.Println("unloaded topic")
+               ch <- struct{}{}
+       }()
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               if i == 3 {
+                       <-ch
+               }
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               expectProperties := map[string]string{
+                       "key-1": "pulsar-1",
+               }
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+               assert.Equal(t, "pulsar", msg.Key())
+               assert.Equal(t, expectProperties, msg.Properties())
+               // ack message
+               consumer.Ack(msg)
+               log.Printf("receive message: %s", msg.ID().String())
+       }
+       err = consumer.Unsubscribe()
+       assert.Nil(t, err)
+       consumer.Close()
+       producer.Close()
+       defer c.Terminate(ctx)
+}
 
 func TestMultipleConsumer(t *testing.T) {
        client, err := NewClient(ClientOptions{

Reply via email to