This is an automated email from the ASF dual-hosted git repository.
crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6aa1fb6f fix: add DLQPolicy.DeadLetterTopicProducerName (#1417)
6aa1fb6f is described below
commit 6aa1fb6fc6432b7a6e9b7790f6ee6fe5f17857f3
Author: Thomas Bousquet <[email protected]>
AuthorDate: Wed Sep 3 19:25:15 2025 -0700
fix: add DLQPolicy.DeadLetterTopicProducerName (#1417)
---
pulsar/consumer.go | 8 +++
pulsar/consumer_test.go | 174 ++++++++++++++++++++++++++++++++++++++++++++++++
pulsar/dlq_router.go | 4 +-
3 files changed, 185 insertions(+), 1 deletion(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index d611c691..80bbf01c 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -75,6 +75,14 @@ type DLQPolicy struct {
// DeadLetterTopic specifies the name of the topic where the failing
messages will be sent.
DeadLetterTopic string
+ // DeadLetterTopicProducerName specifies a name for the producer
specifically for the DLQ topic.
+ // If not assigned, the system will generate a globally unique name
which can be access with
+ // Producer.ProducerName().
+ // When specifying a name, it is up to the user to ensure that, for a
given topic, the producer name is unique
+ // across all Pulsar's clusters. Brokers will enforce that only a
single producer a given name can be publishing on
+ // a topic.
+ DeadLetterTopicProducerName string
+
// ProducerOptions is the producer options to produce messages to the
DLQ and RLQ topic
ProducerOptions ProducerOptions
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index b85ad018..c36f1f59 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1853,6 +1853,180 @@ func TestDeadLetterTopicWithInitialSubscription(t
*testing.T) {
}
+func TestWithoutDeadLetterTopicDeadLetterTopicProducerName(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "persistent://public/default/" + newTopicName()
+ subscriptionName := "default"
+ consumerName := "my-consumer"
+
+ dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, subscriptionName)
+ rlqTopic := fmt.Sprintf("%s-%s-RLQ", topic, subscriptionName)
+
+ producerName := "producer-name"
+ RLQProducerName := "rlq-producer-name"
+ DLQProducerName := "dlq-producer-name"
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subscriptionName,
+ NackRedeliveryDelay: 1 * time.Millisecond,
+ Type: Shared,
+ DLQ: &DLQPolicy{
+ MaxDeliveries: 1,
+ RetryLetterTopic: rlqTopic,
+ DeadLetterTopic: dlqTopic,
+ DeadLetterTopicProducerName: DLQProducerName,
+ ProducerOptions: ProducerOptions{
+ Topic: rlqTopic,
+ Name: RLQProducerName,
+ },
+ },
+ Name: consumerName,
+ RetryEnable: true,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Name: producerName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ _, err = producer.Send(ctx, &ProducerMessage{
+ Payload: []byte("hello-0"),
+ })
+ assert.Nil(t, err)
+
+ // Validate the name of the original producer
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ assert.Equal(t, msg.ProducerName(), producerName)
+ consumer.ReconsumeLater(msg, 0)
+
+ // Validate the name of the RLQ producer
+ msg, err = consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ assert.Equal(t, msg.ProducerName(), RLQProducerName)
+ consumer.Nack(msg)
+
+ // Create DLQ consumer
+ dlqConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: dlqTopic,
+ SubscriptionName: subscriptionName,
+ })
+ assert.Nil(t, err)
+ defer dlqConsumer.Close()
+
+ // Validate the name of the DLQ producer
+ msg, err = dlqConsumer.Receive(ctx)
+ defer dlqConsumer.Nack(msg)
+
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ assert.Nil(t, err)
+ assert.Equal(t, msg.ProducerName(), DLQProducerName)
+}
+
+func TestWithDeadLetterTopicDeadLetterTopicProducerName(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "persistent://public/default/" + newTopicName()
+ subscriptionName := "default"
+ consumerName := "my-consumer"
+
+ dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, subscriptionName)
+ rlqTopic := fmt.Sprintf("%s-%s-RLQ", topic, subscriptionName)
+
+ producerName := "producer-name"
+ RLQProducerName := "rlq-producer-name"
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subscriptionName,
+ NackRedeliveryDelay: 1 * time.Millisecond,
+ Type: Shared,
+ DLQ: &DLQPolicy{
+ MaxDeliveries: 1,
+ RetryLetterTopic: rlqTopic,
+ DeadLetterTopic: dlqTopic,
+ // Set no producer name for the DLQ explicitly
+ DeadLetterTopicProducerName: "",
+ ProducerOptions: ProducerOptions{
+ Topic: rlqTopic,
+ Name: RLQProducerName,
+ },
+ },
+ Name: consumerName,
+ RetryEnable: true,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Name: producerName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ _, err = producer.Send(ctx, &ProducerMessage{
+ Payload: []byte("hello-0"),
+ })
+ assert.Nil(t, err)
+
+ // Validate the name of the original producer
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ assert.Equal(t, msg.ProducerName(), producerName)
+ consumer.ReconsumeLater(msg, 0)
+
+ // Validate the name of the RLQ producer
+ msg, err = consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ assert.Equal(t, msg.ProducerName(), RLQProducerName)
+ consumer.Nack(msg)
+
+ // Create DLQ consumer
+ dlqConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: dlqTopic,
+ SubscriptionName: subscriptionName,
+ })
+ assert.Nil(t, err)
+ defer dlqConsumer.Close()
+
+ // Validate the name of the DLQ producer
+ msg, err = dlqConsumer.Receive(ctx)
+ defer dlqConsumer.Nack(msg)
+
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ regex := regexp.MustCompile(fmt.Sprintf("%s-%s-%s-[a-z]{5}-DLQ", topic,
subscriptionName, consumerName))
+ assert.True(t, regex.MatchString(msg.ProducerName()))
+}
+
func TestDLQMultiTopics(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 3fef99a3..3bacca19 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -161,8 +161,10 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.DeadLetterTopic
opt.Schema = schema
- if opt.Name == "" {
+ if r.policy.DeadLetterTopicProducerName == "" {
opt.Name = fmt.Sprintf("%s-%s-%s-%s-DLQ", r.topicName,
r.subscriptionName, r.consumerName, generateRandomName())
+ } else {
+ opt.Name = r.policy.DeadLetterTopicProducerName
}
opt.initialSubscriptionName = r.policy.InitialSubscriptionName