This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b12eccc2 [Fix] DLQ Messages will lose system properties when sent from 
reconsumeLater() (#1392)
b12eccc2 is described below

commit b12eccc2c60606ff208ad44087b6b913840d617d
Author: zhou zhuohan <[email protected]>
AuthorDate: Thu Aug 7 17:56:05 2025 +0800

    [Fix] DLQ Messages will lose system properties when sent from 
reconsumeLater() (#1392)
    
    Fixes https://github.com/apache/pulsar-client-go/issues/1388
    Master Issue: https://github.com/apache/pulsar-client-go/pull/907, 
https://github.com/apache/pulsar/pull/23182
    
    ### Motivation
    Since pulsar sdk does not support `AckTimeout()` method, we can only use 
`Nack()` or `ReconsumeLater()` to trigger dlq policy and send messages to dlq. 
We can find system properties(e.g. `REAL_TOPIC` and `ORIGIN_MESSAGE_ID`) in 
these dlq messages.
    
    Refer to issue https://github.com/apache/pulsar-client-go/issues/1388, 
before pr https://github.com/apache/pulsar-client-go/pull/907 
`dlq_router#run()` will directly copy properties to create a new dlq message 
when receiving a message from `dlq.Chan()`. But we only define message 
properties in ReconsumeLater's `ReconsumeLaterWithCustomProperties()` and not 
in Nack's `dispatcher()` before send to this `dlq.Chan()`, which lead to only 
dlq messages created by `ReconsumeLater()` can have  [...]
    
    The above pr replaced this behavior with just adding system properties as 
soon as creating dlq messages in `dlq_router#run()`, which use `message.topic` 
as `REAL_TOPIC` property value. But messages sent from `ReconsumeLater() 
ReconsumeLaterWithCustomProperties()` do not contain related `message.topic` 
fields, in case it will override defined `REAL_TOPIC` to empty string.
    
    ### Modifications
    - Add `REAL_TOPIC`, `ORIGIN_MESSAGE_IDY_TIME`, `ORIGIN_MESSAGE_ID` system 
properties both in ReconsumeLater's `ReconsumeLaterWithCustomProperties()` and 
Nack's `dispatcher()` functions.
    - Add `Key`, `OrderingKey`, `EventTime` message fields both in 
`ReconsumeLater() ReconsumeLaterWithCustomProperties()` and `dlq_router run()` 
functions to keep consistent with [Java 
realization](https://github.com/apache/pulsar/pull/23182), so that dlq messages 
can preserve source message info.
    - Remove adding system properties strategy in `dlq_router#run()`.
---
 pulsar/consumer_impl.go      | 12 ++++---
 pulsar/consumer_partition.go | 20 +++++++++++
 pulsar/consumer_test.go      | 83 +++++++++++++++++++++++++++++++++++++++++---
 pulsar/dlq_router.go         | 12 +------
 pulsar/message.go            |  2 +-
 5 files changed, 108 insertions(+), 21 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index c8b28b1f..5f6dac3d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -632,17 +632,21 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg 
Message, customPropert
        } else {
                props[SysPropertyRealTopic] = msg.Topic()
                props[SysPropertyOriginMessageID] = msgID.messageID.String()
+               props[PropertyOriginMessageID] = msgID.messageID.String()
        }
        props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
        props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)
 
        consumerMsg := ConsumerMessage{
                Consumer: c,
+               // Copy msgID so that dlq/rlq router can ack this msg after 
successfully sent to new topic
                Message: &message{
-                       payLoad:    msg.Payload(),
-                       properties: props,
-                       msgID:      msgID,
-                       eventTime:  msg.EventTime(),
+                       payLoad:     msg.Payload(),
+                       key:         msg.Key(),
+                       orderingKey: msg.OrderingKey(),
+                       properties:  props,
+                       eventTime:   msg.EventTime(),
+                       msgID:       msgID,
                },
        }
        if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 2a2f4eb0..41341ede 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1606,6 +1606,26 @@ func (pc *partitionConsumer) dispatcher() {
                        if !pc.isSeeking.Load() {
                                if pc.dlq.shouldSendToDlq(&nextMessage) {
                                        // pass the message to the DLQ router
+                                       // we need to create a new 
ConsumerMessage and add dlq related metadata properties
+                                       properties := make(map[string]string)
+                                       properties[SysPropertyRealTopic] = 
messages[0].Topic()
+                                       properties[SysPropertyOriginMessageID] 
= messages[0].msgID.String()
+                                       properties[PropertyOriginMessageID] = 
messages[0].msgID.String()
+                                       for key, value := range 
messages[0].properties {
+                                               properties[key] = value
+                                       }
+                                       nextMessage = ConsumerMessage{
+                                               Consumer: pc.parentConsumer,
+                                               // Copy msgID so that dlq/rlq 
router can ack this msg after successfully sent to new topic
+                                               Message: &message{
+                                                       payLoad:     
messages[0].Payload(),
+                                                       key:         
messages[0].Key(),
+                                                       orderingKey: 
messages[0].OrderingKey(),
+                                                       properties:  properties,
+                                                       eventTime:   
messages[0].EventTime(),
+                                                       msgID:       
messages[0].msgID,
+                                               },
+                                       }
                                        pc.metrics.DlqCounter.Inc()
                                        messageCh = pc.dlq.Chan()
                                } else {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 4f67c363..c151c984 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1580,12 +1580,26 @@ func DLQWithProducerOptions(t *testing.T, prodOpt 
*ProducerOptions) {
        defer producer.Close()
 
        // send 10 messages
+       eventTimeList := make([]time.Time, 10)
+       msgIDList := make([]string, 10)
+       msgKeyList := make([]string, 10)
        for i := 0; i < 10; i++ {
-               if _, err := producer.Send(ctx, &ProducerMessage{
-                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
-               }); err != nil {
+               eventTime := time.Now()
+               eventTimeList[i] = eventTime
+               msgKeyList[i] = fmt.Sprintf("key-%d", i)
+               msgID, err := producer.Send(ctx, &ProducerMessage{
+                       Payload:     []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:         fmt.Sprintf("key-%d", i),
+                       OrderingKey: fmt.Sprintf("key-%d", i),
+                       EventTime:   eventTime,
+                       Properties: map[string]string{
+                               "key": fmt.Sprintf("key-%d", i),
+                       },
+               })
+               if err != nil {
                        log.Fatal(err)
                }
+               msgIDList[i] = msgID.String()
        }
 
        // receive 10 messages and only ack half-of-them
@@ -1624,10 +1638,27 @@ func DLQWithProducerOptions(t *testing.T, prodOpt 
*ProducerOptions) {
                assert.True(t, regex.MatchString(msg.ProducerName()))
 
                // check original messageId
+               assert.NotEmpty(t, msg.Properties()[SysPropertyOriginMessageID])
+               assert.Equal(t, msgIDList[expectedMsgIdx], 
msg.Properties()[SysPropertyOriginMessageID])
                assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
+               assert.Equal(t, msgIDList[expectedMsgIdx], 
msg.Properties()[PropertyOriginMessageID])
 
                // check original topic
-               assert.NotEmpty(t, msg.Properties()[SysPropertyRealTopic])
+               assert.Contains(t, msg.Properties()[SysPropertyRealTopic], 
topic)
+
+               // check original key
+               assert.NotEmpty(t, msg.Key())
+               assert.Equal(t, msgKeyList[expectedMsgIdx], msg.Key())
+               assert.NotEmpty(t, msg.OrderingKey())
+               assert.Equal(t, msgKeyList[expectedMsgIdx], msg.OrderingKey())
+               assert.NotEmpty(t, msg.Properties()["key"])
+               assert.Equal(t, msg.Key(), msg.Properties()["key"])
+
+               //      check original event time
+               //      Broker will ignore event time microsecond(us) level 
precision,
+               //      so that we need to check eventTime precision in 
millisecond level
+               assert.NotEqual(t, 0, msg.EventTime())
+               assert.True(t, 
eventTimeList[expectedMsgIdx].Sub(msg.EventTime()).Abs() < 2*time.Millisecond)
        }
 
        // No more messages on the DLQ
@@ -1855,9 +1886,24 @@ func TestRLQ(t *testing.T) {
        assert.Nil(t, err)
        defer producer.Close()
 
+       eventTimeList := make([]time.Time, N)
+       msgIDList := make([]string, N)
+       msgKeyList := make([]string, N)
        for i := 0; i < N; i++ {
-               _, err = producer.Send(ctx, &ProducerMessage{Payload: 
[]byte(fmt.Sprintf("MESSAGE_%d", i))})
+               eventTime := time.Now()
+               eventTimeList[i] = eventTime
+               msgKeyList[i] = fmt.Sprintf("key-%d", i)
+               msgID, err := producer.Send(ctx, &ProducerMessage{
+                       Payload:     []byte(fmt.Sprintf("MESSAGE_%d", i)),
+                       Key:         fmt.Sprintf("key-%d", i),
+                       OrderingKey: fmt.Sprintf("key-%d", i),
+                       EventTime:   eventTime,
+                       Properties: map[string]string{
+                               "key": fmt.Sprintf("key-%d", i),
+                       },
+               })
                assert.Nil(t, err)
+               msgIDList[i] = msgID.String()
        }
 
        // 2. Create consumer on the Retry Topic to reconsume N messages 
(maxRedeliveries+1) times
@@ -1903,6 +1949,33 @@ func TestRLQ(t *testing.T) {
        dlqReceived := 0
        for dlqReceived < N {
                msg, err := dlqConsumer.Receive(ctx)
+               //      check original messageId
+               //      we create a topic with three partitions,
+               //      so that messages maybe not be received as the same 
order as we produced
+               assert.NotEmpty(t, msg.Properties()[SysPropertyOriginMessageID])
+               assert.Contains(t, msgIDList, 
msg.Properties()[SysPropertyOriginMessageID])
+               assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
+               assert.Contains(t, msgIDList, 
msg.Properties()[PropertyOriginMessageID])
+
+               // check original topic
+               assert.Contains(t, msg.Properties()[SysPropertyRealTopic], 
topic)
+
+               // check original key
+               assert.NotEmpty(t, msg.Key())
+               assert.Contains(t, msgKeyList, msg.Key())
+               assert.NotEmpty(t, msg.OrderingKey())
+               assert.Contains(t, msgKeyList, msg.OrderingKey())
+               assert.NotEmpty(t, msg.Properties()["key"])
+               assert.Equal(t, msg.Key(), msg.Properties()["key"])
+
+               // check original event time
+               assert.NotEqual(t, 0, msg.EventTime())
+               //      check original event time
+               //      Broker will ignore event time microsecond(us) level 
precision,
+               //      so that we need to check eventTime precision in 
millisecond level
+               assert.LessOrEqual(t, 
eventTimeList[0].Add(-2*time.Millisecond), msg.EventTime())
+               assert.LessOrEqual(t, msg.EventTime(), 
eventTimeList[N-1].Add(2*time.Millisecond))
+
                assert.Nil(t, err)
                dlqConsumer.Ack(msg)
                dlqReceived++
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index c65f01aa..3fef99a3 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -111,21 +111,11 @@ func (r *dlqRouter) run() {
                        producer := 
r.getProducer(cm.Consumer.(*consumer).options.Schema)
                        msg := cm.Message.(*message)
                        msgID := msg.ID()
-
-                       // properties associated with original message
-                       properties := msg.Properties()
-
-                       // include orinal message id in string format in 
properties
-                       properties[PropertyOriginMessageID] = msgID.String()
-
-                       // include original topic name of the message in 
properties
-                       properties[SysPropertyRealTopic] = msg.Topic()
-
                        producer.SendAsync(context.Background(), 
&ProducerMessage{
                                Payload:             msg.Payload(),
                                Key:                 msg.Key(),
                                OrderingKey:         msg.OrderingKey(),
-                               Properties:          properties,
+                               Properties:          msg.Properties(),
                                EventTime:           msg.EventTime(),
                                ReplicationClusters: msg.replicationClusters,
                        }, func(_ MessageID, _ *ProducerMessage, err error) {
diff --git a/pulsar/message.go b/pulsar/message.go
index f965555e..2cc04722 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -40,7 +40,7 @@ type ProducerMessage struct {
 
        // EventTime set the event time for a given message
        // By default, messages don't have an event time associated, while the 
publish
-       // time will be be always present.
+       // time will be always present.
        // Set the event time to a non-zero timestamp to explicitly declare the 
time
        // that the event "happened", as opposed to when the message is being 
published.
        EventTime time.Time

Reply via email to