This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 43926211 Fix missing topic in dlq producer name when using RetryEnable 
option (#1412)
43926211 is described below

commit 43926211d9629d6887ef972d29c9aff0b30c2ee8
Author: zhou zhuohan <[email protected]>
AuthorDate: Fri Aug 29 22:10:23 2025 +0800

    Fix missing topic in dlq producer name when using RetryEnable option (#1412)
---
 pulsar/consumer_impl.go | 10 +++++++++-
 pulsar/consumer_test.go | 11 +++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 04cc9501..20227bee 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -181,7 +181,15 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                }
        }
 
-       dlq, err := newDlqRouter(client, options.DLQ, options.Topic, 
options.SubscriptionName, options.Name,
+       var sourceTopic string
+       if options.RetryEnable && len(options.Topics) == 2 && options.Topics[1] 
== options.DLQ.RetryLetterTopic {
+               //      when RetryEnable=true, options.Topic and 
RetryLetterTopic will be appended to the options.Topics
+               //      we need to try to find previous options.Topic from 
options.Topics
+               sourceTopic = options.Topics[0]
+       } else {
+               sourceTopic = options.Topic
+       }
+       dlq, err := newDlqRouter(client, options.DLQ, sourceTopic, 
options.SubscriptionName, options.Name,
                options.BackOffPolicyFunc, client.log)
        if err != nil {
                return nil, err
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 8a4ea3f3..b85ad018 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1965,6 +1965,7 @@ func TestRLQ(t *testing.T) {
        makeHTTPCall(t, http.MethodPut, testURL, "3")
 
        subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+       consumerName := "my-consumer"
        maxRedeliveries := 2
        N := 100
        ctx := context.Background()
@@ -2002,6 +2003,7 @@ func TestRLQ(t *testing.T) {
        rlqConsumer, err := client.Subscribe(ConsumerOptions{
                Topic:                       topic,
                SubscriptionName:            subName,
+               Name:                        consumerName,
                Type:                        Shared,
                SubscriptionInitialPosition: SubscriptionPositionEarliest,
                DLQ: &DLQPolicy{
@@ -2068,6 +2070,10 @@ func TestRLQ(t *testing.T) {
                assert.LessOrEqual(t, 
eventTimeList[0].Add(-2*time.Millisecond), msg.EventTime())
                assert.LessOrEqual(t, msg.EventTime(), 
eventTimeList[N-1].Add(2*time.Millisecond))
 
+               // check dlq produceName
+               regex := 
regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", topic, subName, 
consumerName))
+               assert.True(t, regex.MatchString(msg.ProducerName()))
+
                assert.Nil(t, err)
                dlqConsumer.Ack(msg)
                dlqReceived++
@@ -2360,6 +2366,7 @@ func TestRLQMultiTopics(t *testing.T) {
        topics := []string{topic01, topic02}
 
        subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+       consumerName := "my-consumer"
        maxRedeliveries := 2
        N := 100
        ctx := context.Background()
@@ -2372,6 +2379,7 @@ func TestRLQMultiTopics(t *testing.T) {
        rlqConsumer, err := client.Subscribe(ConsumerOptions{
                Topics:                      topics,
                SubscriptionName:            subName,
+               Name:                        consumerName,
                Type:                        Shared,
                SubscriptionInitialPosition: SubscriptionPositionEarliest,
                DLQ:                         &DLQPolicy{MaxDeliveries: 
uint32(maxRedeliveries)},
@@ -2426,9 +2434,12 @@ func TestRLQMultiTopics(t *testing.T) {
 
        // 3. Create consumer on the DLQ topic to verify the routing
        dlqReceived := 0
+       // check dlq produceName
+       regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", "", 
subName, consumerName))
        for dlqReceived < 2*N {
                msg, err := dlqConsumer.Receive(ctx)
                assert.Nil(t, err)
+               assert.True(t, regex.MatchString(msg.ProducerName()))
                dlqConsumer.Ack(msg)
                dlqReceived++
        }

Reply via email to