This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ba6468a5 fix: handle reconnection logic for zero queue consumer (#1404)
ba6468a5 is described below
commit ba6468a5edf36644ccee72a19cdfc8cec6dac40e
Author: crossoverJie <[email protected]>
AuthorDate: Fri Aug 29 20:52:57 2025 +0800
fix: handle reconnection logic for zero queue consumer (#1404)
---
pulsar/consumer_impl.go | 1 +
pulsar/consumer_partition.go | 13 ++--
pulsar/consumer_zero_queue_test.go | 122 +++++++++++++++++++++++++++++++++++++
3 files changed, 132 insertions(+), 4 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 5f6dac3d..04cc9501 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -473,6 +473,7 @@ func newPartitionConsumerOpts(topic, consumerName string,
idx int, options Consu
enableBatchIndexAck:
options.EnableBatchIndexAcknowledgment,
ackGroupingOptions: options.AckGroupingOptions,
autoReceiverQueueSize:
options.EnableAutoScaledReceiverQueueSize,
+ enableZeroQueueConsumer: options.EnableZeroQueueConsumer,
}
}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 56e90a13..fcde4f3a 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -123,9 +123,10 @@ type partitionConsumerOpts struct {
expireTimeOfIncompleteChunk time.Duration
autoAckIncompleteChunk bool
// in failover mode, this callback will be called when consumer change
- consumerEventListener ConsumerEventListener
- enableBatchIndexAck bool
- ackGroupingOptions *AckGroupingOptions
+ consumerEventListener ConsumerEventListener
+ enableBatchIndexAck bool
+ ackGroupingOptions *AckGroupingOptions
+ enableZeroQueueConsumer bool
}
type ConsumerEventListener interface {
@@ -1665,7 +1666,7 @@ func (pc *partitionConsumer) dispatcher() {
pc.log.Debugf("dispatcher requesting initial
permits=%d", initialPermits)
// send initial permits
- if err := pc.internalFlow(initialPermits); err != nil {
+ if err := pc.internalFlow(initialPermits); err != nil
&& !pc.options.enableZeroQueueConsumer {
pc.log.WithError(err).Error("unable to send
initial permits to broker")
}
@@ -1922,6 +1923,10 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
bo.Reset()
+ if pc.options.enableZeroQueueConsumer {
+ pc.log.Info("zeroQueueConsumer reconnect, reset
availablePermits")
+ pc.availablePermits.inc()
+ }
return struct{}{}, nil
}
pc.log.WithError(err).Error("Failed to create consumer at
reconnect")
diff --git a/pulsar/consumer_zero_queue_test.go
b/pulsar/consumer_zero_queue_test.go
index 1538a3c4..06db433b 100644
--- a/pulsar/consumer_zero_queue_test.go
+++ b/pulsar/consumer_zero_queue_test.go
@@ -24,6 +24,12 @@ import (
"testing"
"time"
+ "github.com/docker/docker/api/types/container"
+ "github.com/docker/go-connections/nat"
+ "github.com/stretchr/testify/require"
+ "github.com/testcontainers/testcontainers-go"
+ "github.com/testcontainers/testcontainers-go/wait"
+
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
@@ -115,6 +121,122 @@ func TestNormalZeroQueueConsumer(t *testing.T) {
err = consumer.Unsubscribe()
assert.Nil(t, err)
}
+func TestReconnectConsumer(t *testing.T) {
+
+ req := testcontainers.ContainerRequest{
+ Name: "pulsar-test",
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ HostConfigModifier: func(config *container.HostConfig) {
+ config.PortBindings = map[nat.Port][]nat.PortBinding{
+ "6650/tcp": {{HostIP: "0.0.0.0", HostPort:
"6659"}},
+ "8080/tcp": {{HostIP: "0.0.0.0", HostPort:
"8089"}},
+ }
+ },
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ Reuse: true,
+ })
+ require.NoError(t, err, "Failed to start the pulsar container")
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+ client, err := NewClient(ClientOptions{
+ URL: endpoint,
+ })
+ assert.Nil(t, err)
+ adminEndpoint, err := c.PortEndpoint(context.Background(), "8080",
"http")
+ assert.Nil(t, err)
+ admin, err := pulsaradmin.NewClient(&config.Config{
+ WebServiceURL: adminEndpoint,
+ })
+ assert.Nil(t, err)
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+ var consumer Consumer
+ require.Eventually(t, func() bool {
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ EnableZeroQueueConsumer: true,
+ })
+ return err == nil
+ }, 30*time.Second, 1*time.Second)
+
+ assert.Nil(t, err)
+ _, ok := consumer.(*zeroQueueConsumer)
+ assert.True(t, ok)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "pulsar",
+ Properties: map[string]string{
+ "key-1": "pulsar-1",
+ },
+ })
+ assert.Nil(t, err)
+ log.Printf("send message: %s", msg.String())
+ }
+
+ ch := make(chan struct{})
+
+ go func() {
+ time.Sleep(3 * time.Second)
+ log.Println("unloading topic")
+ // Simulate a broker restart by stopping the pulsar container
+ topicName, err := utils.GetTopicName(topic)
+ assert.Nil(t, err)
+ err = admin.Topics().Unload(*topicName)
+ assert.Nil(t, err)
+ log.Println("unloaded topic")
+ ch <- struct{}{}
+ }()
+
+ // receive 10 messages
+ for i := 0; i < 10; i++ {
+ if i == 3 {
+ <-ch
+ }
+ msg, err := consumer.Receive(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ expectProperties := map[string]string{
+ "key-1": "pulsar-1",
+ }
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ assert.Equal(t, "pulsar", msg.Key())
+ assert.Equal(t, expectProperties, msg.Properties())
+ // ack message
+ consumer.Ack(msg)
+ log.Printf("receive message: %s", msg.ID().String())
+ }
+ err = consumer.Unsubscribe()
+ assert.Nil(t, err)
+ consumer.Close()
+ producer.Close()
+ defer c.Terminate(ctx)
+}
func TestMultipleConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{