This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new b12eccc2 [Fix] DLQ Messages will lose system properties when sent from
reconsumeLater() (#1392)
b12eccc2 is described below
commit b12eccc2c60606ff208ad44087b6b913840d617d
Author: zhou zhuohan <[email protected]>
AuthorDate: Thu Aug 7 17:56:05 2025 +0800
[Fix] DLQ Messages will lose system properties when sent from
reconsumeLater() (#1392)
Fixes https://github.com/apache/pulsar-client-go/issues/1388
Master Issue: https://github.com/apache/pulsar-client-go/pull/907,
https://github.com/apache/pulsar/pull/23182
### Motivation
Since pulsar sdk does not support `AckTimeout()` method, we can only use
`Nack()` or `ReconsumeLater()` to trigger dlq policy and send messages to dlq.
We can find system properties(e.g. `REAL_TOPIC` and `ORIGIN_MESSAGE_ID`) in
these dlq messages.
Refer to issue https://github.com/apache/pulsar-client-go/issues/1388,
before pr https://github.com/apache/pulsar-client-go/pull/907
`dlq_router#run()` will directly copy properties to create a new dlq message
when receiving a message from `dlq.Chan()`. But we only define message
properties in ReconsumeLater's `ReconsumeLaterWithCustomProperties()` and not
in Nack's `dispatcher()` before send to this `dlq.Chan()`, which lead to only
dlq messages created by `ReconsumeLater()` can have [...]
The above pr replaced this behavior with just adding system properties as
soon as creating dlq messages in `dlq_router#run()`, which use `message.topic`
as `REAL_TOPIC` property value. But messages sent from `ReconsumeLater()
ReconsumeLaterWithCustomProperties()` do not contain related `message.topic`
fields, in case it will override defined `REAL_TOPIC` to empty string.
### Modifications
- Add `REAL_TOPIC`, `ORIGIN_MESSAGE_IDY_TIME`, `ORIGIN_MESSAGE_ID` system
properties both in ReconsumeLater's `ReconsumeLaterWithCustomProperties()` and
Nack's `dispatcher()` functions.
- Add `Key`, `OrderingKey`, `EventTime` message fields both in
`ReconsumeLater() ReconsumeLaterWithCustomProperties()` and `dlq_router run()`
functions to keep consistent with [Java
realization](https://github.com/apache/pulsar/pull/23182), so that dlq messages
can preserve source message info.
- Remove adding system properties strategy in `dlq_router#run()`.
---
pulsar/consumer_impl.go | 12 ++++---
pulsar/consumer_partition.go | 20 +++++++++++
pulsar/consumer_test.go | 83 +++++++++++++++++++++++++++++++++++++++++---
pulsar/dlq_router.go | 12 +------
pulsar/message.go | 2 +-
5 files changed, 108 insertions(+), 21 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index c8b28b1f..5f6dac3d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -632,17 +632,21 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg
Message, customPropert
} else {
props[SysPropertyRealTopic] = msg.Topic()
props[SysPropertyOriginMessageID] = msgID.messageID.String()
+ props[PropertyOriginMessageID] = msgID.messageID.String()
}
props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)
consumerMsg := ConsumerMessage{
Consumer: c,
+ // Copy msgID so that dlq/rlq router can ack this msg after
successfully sent to new topic
Message: &message{
- payLoad: msg.Payload(),
- properties: props,
- msgID: msgID,
- eventTime: msg.EventTime(),
+ payLoad: msg.Payload(),
+ key: msg.Key(),
+ orderingKey: msg.OrderingKey(),
+ properties: props,
+ eventTime: msg.EventTime(),
+ msgID: msgID,
},
}
if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 2a2f4eb0..41341ede 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1606,6 +1606,26 @@ func (pc *partitionConsumer) dispatcher() {
if !pc.isSeeking.Load() {
if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
+ // we need to create a new
ConsumerMessage and add dlq related metadata properties
+ properties := make(map[string]string)
+ properties[SysPropertyRealTopic] =
messages[0].Topic()
+ properties[SysPropertyOriginMessageID]
= messages[0].msgID.String()
+ properties[PropertyOriginMessageID] =
messages[0].msgID.String()
+ for key, value := range
messages[0].properties {
+ properties[key] = value
+ }
+ nextMessage = ConsumerMessage{
+ Consumer: pc.parentConsumer,
+ // Copy msgID so that dlq/rlq
router can ack this msg after successfully sent to new topic
+ Message: &message{
+ payLoad:
messages[0].Payload(),
+ key:
messages[0].Key(),
+ orderingKey:
messages[0].OrderingKey(),
+ properties: properties,
+ eventTime:
messages[0].EventTime(),
+ msgID:
messages[0].msgID,
+ },
+ }
pc.metrics.DlqCounter.Inc()
messageCh = pc.dlq.Chan()
} else {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 4f67c363..c151c984 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1580,12 +1580,26 @@ func DLQWithProducerOptions(t *testing.T, prodOpt
*ProducerOptions) {
defer producer.Close()
// send 10 messages
+ eventTimeList := make([]time.Time, 10)
+ msgIDList := make([]string, 10)
+ msgKeyList := make([]string, 10)
for i := 0; i < 10; i++ {
- if _, err := producer.Send(ctx, &ProducerMessage{
- Payload: []byte(fmt.Sprintf("hello-%d", i)),
- }); err != nil {
+ eventTime := time.Now()
+ eventTimeList[i] = eventTime
+ msgKeyList[i] = fmt.Sprintf("key-%d", i)
+ msgID, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: fmt.Sprintf("key-%d", i),
+ OrderingKey: fmt.Sprintf("key-%d", i),
+ EventTime: eventTime,
+ Properties: map[string]string{
+ "key": fmt.Sprintf("key-%d", i),
+ },
+ })
+ if err != nil {
log.Fatal(err)
}
+ msgIDList[i] = msgID.String()
}
// receive 10 messages and only ack half-of-them
@@ -1624,10 +1638,27 @@ func DLQWithProducerOptions(t *testing.T, prodOpt
*ProducerOptions) {
assert.True(t, regex.MatchString(msg.ProducerName()))
// check original messageId
+ assert.NotEmpty(t, msg.Properties()[SysPropertyOriginMessageID])
+ assert.Equal(t, msgIDList[expectedMsgIdx],
msg.Properties()[SysPropertyOriginMessageID])
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
+ assert.Equal(t, msgIDList[expectedMsgIdx],
msg.Properties()[PropertyOriginMessageID])
// check original topic
- assert.NotEmpty(t, msg.Properties()[SysPropertyRealTopic])
+ assert.Contains(t, msg.Properties()[SysPropertyRealTopic],
topic)
+
+ // check original key
+ assert.NotEmpty(t, msg.Key())
+ assert.Equal(t, msgKeyList[expectedMsgIdx], msg.Key())
+ assert.NotEmpty(t, msg.OrderingKey())
+ assert.Equal(t, msgKeyList[expectedMsgIdx], msg.OrderingKey())
+ assert.NotEmpty(t, msg.Properties()["key"])
+ assert.Equal(t, msg.Key(), msg.Properties()["key"])
+
+ // check original event time
+ // Broker will ignore event time microsecond(us) level
precision,
+ // so that we need to check eventTime precision in
millisecond level
+ assert.NotEqual(t, 0, msg.EventTime())
+ assert.True(t,
eventTimeList[expectedMsgIdx].Sub(msg.EventTime()).Abs() < 2*time.Millisecond)
}
// No more messages on the DLQ
@@ -1855,9 +1886,24 @@ func TestRLQ(t *testing.T) {
assert.Nil(t, err)
defer producer.Close()
+ eventTimeList := make([]time.Time, N)
+ msgIDList := make([]string, N)
+ msgKeyList := make([]string, N)
for i := 0; i < N; i++ {
- _, err = producer.Send(ctx, &ProducerMessage{Payload:
[]byte(fmt.Sprintf("MESSAGE_%d", i))})
+ eventTime := time.Now()
+ eventTimeList[i] = eventTime
+ msgKeyList[i] = fmt.Sprintf("key-%d", i)
+ msgID, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("MESSAGE_%d", i)),
+ Key: fmt.Sprintf("key-%d", i),
+ OrderingKey: fmt.Sprintf("key-%d", i),
+ EventTime: eventTime,
+ Properties: map[string]string{
+ "key": fmt.Sprintf("key-%d", i),
+ },
+ })
assert.Nil(t, err)
+ msgIDList[i] = msgID.String()
}
// 2. Create consumer on the Retry Topic to reconsume N messages
(maxRedeliveries+1) times
@@ -1903,6 +1949,33 @@ func TestRLQ(t *testing.T) {
dlqReceived := 0
for dlqReceived < N {
msg, err := dlqConsumer.Receive(ctx)
+ // check original messageId
+ // we create a topic with three partitions,
+ // so that messages maybe not be received as the same
order as we produced
+ assert.NotEmpty(t, msg.Properties()[SysPropertyOriginMessageID])
+ assert.Contains(t, msgIDList,
msg.Properties()[SysPropertyOriginMessageID])
+ assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
+ assert.Contains(t, msgIDList,
msg.Properties()[PropertyOriginMessageID])
+
+ // check original topic
+ assert.Contains(t, msg.Properties()[SysPropertyRealTopic],
topic)
+
+ // check original key
+ assert.NotEmpty(t, msg.Key())
+ assert.Contains(t, msgKeyList, msg.Key())
+ assert.NotEmpty(t, msg.OrderingKey())
+ assert.Contains(t, msgKeyList, msg.OrderingKey())
+ assert.NotEmpty(t, msg.Properties()["key"])
+ assert.Equal(t, msg.Key(), msg.Properties()["key"])
+
+ // check original event time
+ assert.NotEqual(t, 0, msg.EventTime())
+ // check original event time
+ // Broker will ignore event time microsecond(us) level
precision,
+ // so that we need to check eventTime precision in
millisecond level
+ assert.LessOrEqual(t,
eventTimeList[0].Add(-2*time.Millisecond), msg.EventTime())
+ assert.LessOrEqual(t, msg.EventTime(),
eventTimeList[N-1].Add(2*time.Millisecond))
+
assert.Nil(t, err)
dlqConsumer.Ack(msg)
dlqReceived++
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index c65f01aa..3fef99a3 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -111,21 +111,11 @@ func (r *dlqRouter) run() {
producer :=
r.getProducer(cm.Consumer.(*consumer).options.Schema)
msg := cm.Message.(*message)
msgID := msg.ID()
-
- // properties associated with original message
- properties := msg.Properties()
-
- // include orinal message id in string format in
properties
- properties[PropertyOriginMessageID] = msgID.String()
-
- // include original topic name of the message in
properties
- properties[SysPropertyRealTopic] = msg.Topic()
-
producer.SendAsync(context.Background(),
&ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
OrderingKey: msg.OrderingKey(),
- Properties: properties,
+ Properties: msg.Properties(),
EventTime: msg.EventTime(),
ReplicationClusters: msg.replicationClusters,
}, func(_ MessageID, _ *ProducerMessage, err error) {
diff --git a/pulsar/message.go b/pulsar/message.go
index f965555e..2cc04722 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -40,7 +40,7 @@ type ProducerMessage struct {
// EventTime set the event time for a given message
// By default, messages don't have an event time associated, while the
publish
- // time will be be always present.
+ // time will be always present.
// Set the event time to a non-zero timestamp to explicitly declare the
time
// that the event "happened", as opposed to when the message is being
published.
EventTime time.Time