This is an automated email from the ASF dual-hosted git repository.

crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aa1fb6f fix: add DLQPolicy.DeadLetterTopicProducerName (#1417)
6aa1fb6f is described below

commit 6aa1fb6fc6432b7a6e9b7790f6ee6fe5f17857f3
Author: Thomas Bousquet <[email protected]>
AuthorDate: Wed Sep 3 19:25:15 2025 -0700

    fix: add DLQPolicy.DeadLetterTopicProducerName (#1417)
---
 pulsar/consumer.go      |   8 +++
 pulsar/consumer_test.go | 174 ++++++++++++++++++++++++++++++++++++++++++++++++
 pulsar/dlq_router.go    |   4 +-
 3 files changed, 185 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index d611c691..80bbf01c 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -75,6 +75,14 @@ type DLQPolicy struct {
        // DeadLetterTopic specifies the name of the topic where the failing 
messages will be sent.
        DeadLetterTopic string
 
+       // DeadLetterTopicProducerName specifies a name for the producer 
specifically for the DLQ topic.
+       // If not assigned, the system will generate a globally unique name 
which can be access with
+       // Producer.ProducerName().
+       // When specifying a name, it is up to the user to ensure that, for a 
given topic, the producer name is unique
+       // across all Pulsar's clusters. Brokers will enforce that only a 
single producer a given name can be publishing on
+       // a topic.
+       DeadLetterTopicProducerName string
+
        // ProducerOptions is the producer options to produce messages to the 
DLQ and RLQ topic
        ProducerOptions ProducerOptions
 
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index b85ad018..c36f1f59 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1853,6 +1853,180 @@ func TestDeadLetterTopicWithInitialSubscription(t 
*testing.T) {
 
 }
 
+func TestWithoutDeadLetterTopicDeadLetterTopicProducerName(t *testing.T) {
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := "persistent://public/default/" + newTopicName()
+       subscriptionName := "default"
+       consumerName := "my-consumer"
+
+       dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, subscriptionName)
+       rlqTopic := fmt.Sprintf("%s-%s-RLQ", topic, subscriptionName)
+
+       producerName := "producer-name"
+       RLQProducerName := "rlq-producer-name"
+       DLQProducerName := "dlq-producer-name"
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:               topic,
+               SubscriptionName:    subscriptionName,
+               NackRedeliveryDelay: 1 * time.Millisecond,
+               Type:                Shared,
+               DLQ: &DLQPolicy{
+                       MaxDeliveries:               1,
+                       RetryLetterTopic:            rlqTopic,
+                       DeadLetterTopic:             dlqTopic,
+                       DeadLetterTopicProducerName: DLQProducerName,
+                       ProducerOptions: ProducerOptions{
+                               Topic: rlqTopic,
+                               Name:  RLQProducerName,
+                       },
+               },
+               Name:        consumerName,
+               RetryEnable: true,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Name:  producerName,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       _, err = producer.Send(ctx, &ProducerMessage{
+               Payload: []byte("hello-0"),
+       })
+       assert.Nil(t, err)
+
+       // Validate the name of the original producer
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, err)
+       assert.NotNil(t, msg)
+       assert.Equal(t, msg.ProducerName(), producerName)
+       consumer.ReconsumeLater(msg, 0)
+
+       // Validate the name of the RLQ producer
+       msg, err = consumer.Receive(ctx)
+       assert.Nil(t, err)
+       assert.NotNil(t, msg)
+       assert.Equal(t, msg.ProducerName(), RLQProducerName)
+       consumer.Nack(msg)
+
+       // Create DLQ consumer
+       dlqConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            dlqTopic,
+               SubscriptionName: subscriptionName,
+       })
+       assert.Nil(t, err)
+       defer dlqConsumer.Close()
+
+       // Validate the name of the DLQ producer
+       msg, err = dlqConsumer.Receive(ctx)
+       defer dlqConsumer.Nack(msg)
+
+       assert.Nil(t, err)
+       assert.NotNil(t, msg)
+       assert.Nil(t, err)
+       assert.Equal(t, msg.ProducerName(), DLQProducerName)
+}
+
+func TestWithDeadLetterTopicDeadLetterTopicProducerName(t *testing.T) {
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := "persistent://public/default/" + newTopicName()
+       subscriptionName := "default"
+       consumerName := "my-consumer"
+
+       dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, subscriptionName)
+       rlqTopic := fmt.Sprintf("%s-%s-RLQ", topic, subscriptionName)
+
+       producerName := "producer-name"
+       RLQProducerName := "rlq-producer-name"
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:               topic,
+               SubscriptionName:    subscriptionName,
+               NackRedeliveryDelay: 1 * time.Millisecond,
+               Type:                Shared,
+               DLQ: &DLQPolicy{
+                       MaxDeliveries:    1,
+                       RetryLetterTopic: rlqTopic,
+                       DeadLetterTopic:  dlqTopic,
+                       // Set no producer name for the DLQ explicitly
+                       DeadLetterTopicProducerName: "",
+                       ProducerOptions: ProducerOptions{
+                               Topic: rlqTopic,
+                               Name:  RLQProducerName,
+                       },
+               },
+               Name:        consumerName,
+               RetryEnable: true,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Name:  producerName,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       _, err = producer.Send(ctx, &ProducerMessage{
+               Payload: []byte("hello-0"),
+       })
+       assert.Nil(t, err)
+
+       // Validate the name of the original producer
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, err)
+       assert.NotNil(t, msg)
+       assert.Equal(t, msg.ProducerName(), producerName)
+       consumer.ReconsumeLater(msg, 0)
+
+       // Validate the name of the RLQ producer
+       msg, err = consumer.Receive(ctx)
+       assert.Nil(t, err)
+       assert.NotNil(t, msg)
+       assert.Equal(t, msg.ProducerName(), RLQProducerName)
+       consumer.Nack(msg)
+
+       // Create DLQ consumer
+       dlqConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            dlqTopic,
+               SubscriptionName: subscriptionName,
+       })
+       assert.Nil(t, err)
+       defer dlqConsumer.Close()
+
+       // Validate the name of the DLQ producer
+       msg, err = dlqConsumer.Receive(ctx)
+       defer dlqConsumer.Nack(msg)
+
+       assert.Nil(t, err)
+       assert.NotNil(t, msg)
+       regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", topic, 
subscriptionName, consumerName))
+       assert.True(t, regex.MatchString(msg.ProducerName()))
+}
+
 func TestDLQMultiTopics(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 3fef99a3..3bacca19 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -161,8 +161,10 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
                opt := r.policy.ProducerOptions
                opt.Topic = r.policy.DeadLetterTopic
                opt.Schema = schema
-               if opt.Name == "" {
+               if r.policy.DeadLetterTopicProducerName == "" {
                        opt.Name = fmt.Sprintf("%s-%s-%s-%s-DLQ", r.topicName, 
r.subscriptionName, r.consumerName, generateRandomName())
+               } else {
+                       opt.Name = r.policy.DeadLetterTopicProducerName
                }
                opt.initialSubscriptionName = r.policy.InitialSubscriptionName
 

Reply via email to