This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 43926211 Fix missing topic in dlq producer name when using RetryEnable
option (#1412)
43926211 is described below
commit 43926211d9629d6887ef972d29c9aff0b30c2ee8
Author: zhou zhuohan <[email protected]>
AuthorDate: Fri Aug 29 22:10:23 2025 +0800
Fix missing topic in dlq producer name when using RetryEnable option (#1412)
---
pulsar/consumer_impl.go | 10 +++++++++-
pulsar/consumer_test.go | 11 +++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 04cc9501..20227bee 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -181,7 +181,15 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
}
}
- dlq, err := newDlqRouter(client, options.DLQ, options.Topic,
options.SubscriptionName, options.Name,
+ var sourceTopic string
+ if options.RetryEnable && len(options.Topics) == 2 && options.Topics[1]
== options.DLQ.RetryLetterTopic {
+ // when RetryEnable=true, options.Topic and
RetryLetterTopic will be appended to the options.Topics
+ // we need to try to find previous options.Topic from
options.Topics
+ sourceTopic = options.Topics[0]
+ } else {
+ sourceTopic = options.Topic
+ }
+ dlq, err := newDlqRouter(client, options.DLQ, sourceTopic,
options.SubscriptionName, options.Name,
options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 8a4ea3f3..b85ad018 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1965,6 +1965,7 @@ func TestRLQ(t *testing.T) {
makeHTTPCall(t, http.MethodPut, testURL, "3")
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+ consumerName := "my-consumer"
maxRedeliveries := 2
N := 100
ctx := context.Background()
@@ -2002,6 +2003,7 @@ func TestRLQ(t *testing.T) {
rlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
+ Name: consumerName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
DLQ: &DLQPolicy{
@@ -2068,6 +2070,10 @@ func TestRLQ(t *testing.T) {
assert.LessOrEqual(t,
eventTimeList[0].Add(-2*time.Millisecond), msg.EventTime())
assert.LessOrEqual(t, msg.EventTime(),
eventTimeList[N-1].Add(2*time.Millisecond))
+ // check dlq produceName
+ regex :=
regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", topic, subName,
consumerName))
+ assert.True(t, regex.MatchString(msg.ProducerName()))
+
assert.Nil(t, err)
dlqConsumer.Ack(msg)
dlqReceived++
@@ -2360,6 +2366,7 @@ func TestRLQMultiTopics(t *testing.T) {
topics := []string{topic01, topic02}
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+ consumerName := "my-consumer"
maxRedeliveries := 2
N := 100
ctx := context.Background()
@@ -2372,6 +2379,7 @@ func TestRLQMultiTopics(t *testing.T) {
rlqConsumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: subName,
+ Name: consumerName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
DLQ: &DLQPolicy{MaxDeliveries:
uint32(maxRedeliveries)},
@@ -2426,9 +2434,12 @@ func TestRLQMultiTopics(t *testing.T) {
// 3. Create consumer on the DLQ topic to verify the routing
dlqReceived := 0
+ // check dlq produceName
+ regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", "",
subName, consumerName))
for dlqReceived < 2*N {
msg, err := dlqConsumer.Receive(ctx)
assert.Nil(t, err)
+ assert.True(t, regex.MatchString(msg.ProducerName()))
dlqConsumer.Ack(msg)
dlqReceived++
}