This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new cf832bb6 [Issue #1418] Fix: Properties not getting consistently set on
pulsaradmin subscription message responses (#1419)
cf832bb6 is described below
commit cf832bb6faa6c1aa18b38ab0505fe81682289ec6
Author: JamesMurkin <[email protected]>
AuthorDate: Mon Sep 8 11:15:33 2025 +0100
[Issue #1418] Fix: Properties not getting consistently set on pulsaradmin
subscription message responses (#1419)
### Motivation
The properties on messages returned by PeekMessages is not returned
consistently as described in #1418
This change makes sure we set all properties from headers before continuing
on, so the returned message should consistently contain all properties.
Currently the properties returned is inconsistent on repeated calls, even
if the message returned is the same each time.
### Modifications
I've made it so all headers are processed / al properties are set on every
message, rather than exiting the loop early which can cause inconsistent
results.
---
pulsaradmin/pkg/admin/subscription.go | 6 +-
pulsaradmin/pkg/admin/subscription_test.go | 147 +++++++++++++++++++----------
2 files changed, 104 insertions(+), 49 deletions(-)
diff --git a/pulsaradmin/pkg/admin/subscription.go
b/pulsaradmin/pkg/admin/subscription.go
index a1a13619..2dfa28d6 100644
--- a/pulsaradmin/pkg/admin/subscription.go
+++ b/pulsaradmin/pkg/admin/subscription.go
@@ -256,6 +256,7 @@ func handleResp(topic utils.TopicName, resp *http.Response)
([]*utils.Message, e
}
properties := make(map[string]string)
+ isBatch := false
for k := range resp.Header {
switch {
case k == PublishTimeHeader:
@@ -268,7 +269,7 @@ func handleResp(topic utils.TopicName, resp *http.Response)
([]*utils.Message, e
if h != "" {
properties[BatchHeader] = h
}
- return getIndividualMsgsFromBatch(topic, ID, payload,
properties)
+ isBatch = true
case k == PropertyHeader:
propJSON := resp.Header.Get(k)
if err := json.Unmarshal([]byte(propJSON),
&properties); err != nil {
@@ -280,6 +281,9 @@ func handleResp(topic utils.TopicName, resp *http.Response)
([]*utils.Message, e
}
}
+ if isBatch {
+ return getIndividualMsgsFromBatch(topic, ID, payload,
properties)
+ }
return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload,
properties)}, nil
}
diff --git a/pulsaradmin/pkg/admin/subscription_test.go
b/pulsaradmin/pkg/admin/subscription_test.go
index 1e4a7b21..08d2cf22 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -20,6 +20,7 @@ package admin
import (
"context"
"fmt"
+ "strconv"
"sync"
"testing"
"time"
@@ -144,58 +145,108 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) {
}
}
-func TestPeekMessageWithProperties(t *testing.T) {
- randomName := newTopicName()
- topic := "persistent://public/default/" + randomName
- topicName, _ := utils.GetTopicName(topic)
- subName := "test-sub"
-
- cfg := &config.Config{}
- admin, err := New(cfg)
- assert.NoError(t, err)
- assert.NotNil(t, admin)
-
- client, err := pulsar.NewClient(pulsar.ClientOptions{
- URL: lookupURL,
- })
- assert.NoError(t, err)
- defer client.Close()
-
- // Create a producer for non-batch messages
- producer, err := client.CreateProducer(pulsar.ProducerOptions{
- Topic: topic,
- DisableBatching: true,
- })
- assert.NoError(t, err)
- defer producer.Close()
-
- props := map[string]string{
- "key1": "value1",
- "KEY2": "VALUE2",
- "KeY3": "VaLuE3",
- "details=man": "good at playing basketball",
+func TestPeekMessagesWithProperties(t *testing.T) {
+ tests := map[string]struct {
+ batched bool
+ }{
+ "non-batched": {
+ batched: false,
+ },
+ "batched": {
+ batched: true,
+ },
}
- _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
- Payload: []byte("test-message"),
- Properties: props,
- })
- assert.NoError(t, err)
-
- // Peek messages
- messages, err := admin.Subscriptions().PeekMessages(*topicName,
subName, 1)
- assert.NoError(t, err)
- assert.NotNil(t, messages)
-
- // Verify properties of messages
- for _, msg := range messages {
- assert.Equal(t, "value1", msg.Properties["key1"])
- assert.Equal(t, "VALUE2", msg.Properties["KEY2"])
- assert.Equal(t, "VaLuE3", msg.Properties["KeY3"])
- assert.Equal(t, "good at playing basketball",
msg.Properties["details=man"])
+ for name, tc := range tests {
+ t.Run(name, func(t *testing.T) {
+ ctx := context.Background()
+ randomName := newTopicName()
+ topic := "persistent://public/default/" + randomName
+ topicName, _ := utils.GetTopicName(topic)
+ subName := "test-sub"
+
+ cfg := &config.Config{}
+ admin, err := New(cfg)
+ assert.NoError(t, err)
+ assert.NotNil(t, admin)
+
+ client, err := pulsar.NewClient(pulsar.ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ var producer pulsar.Producer
+ batchSize := 5
+ if tc.batched {
+ producer, err =
client.CreateProducer(pulsar.ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ BatchingMaxMessages:
uint(batchSize),
+ BatchingMaxPublishDelay: time.Second *
2,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+ } else {
+ producer, err =
client.CreateProducer(pulsar.ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.NoError(t, err)
+ defer producer.Close()
+ }
+
+ props := map[string]string{
+ "key1": "value1",
+ "KEY2": "VALUE2",
+ "KeY3": "VaLuE3",
+ "details=man": "good at playing basketball",
+ }
+
+ var wg sync.WaitGroup
+ numberOfMessagesToWaitFor := 10
+ numberOfMessagesToSend := numberOfMessagesToWaitFor
+ if tc.batched {
+ // If batched send one extra message to cause
the batch to be sent immediately
+ numberOfMessagesToSend++
+ }
+ wg.Add(numberOfMessagesToWaitFor)
+
+ for i := 0; i < numberOfMessagesToSend; i++ {
+ producer.SendAsync(ctx, &pulsar.ProducerMessage{
+ Payload: []byte("test-message"),
+ Properties: props,
+ }, func(_ pulsar.MessageID, _
*pulsar.ProducerMessage, err error) {
+ assert.Nil(t, err)
+ if i < numberOfMessagesToWaitFor {
+ wg.Done()
+ }
+ })
+ }
+ wg.Wait()
+
+ // Peek messages
+ messages, err :=
admin.Subscriptions().PeekMessages(*topicName, subName, batchSize)
+ assert.NoError(t, err)
+ assert.NotNil(t, messages)
+ assert.Len(t, messages, batchSize)
+
+ // Verify properties of messages
+ for _, msg := range messages {
+ assert.Equal(t, "value1",
msg.Properties["key1"])
+ assert.Equal(t, "VALUE2",
msg.Properties["KEY2"])
+ assert.Equal(t, "VaLuE3",
msg.Properties["KeY3"])
+ assert.Equal(t, "good at playing basketball",
msg.Properties["details=man"])
+ // Standard pulsar properties, set by pulsar
+ assert.NotEmpty(t,
msg.Properties["publish-time"])
+ if tc.batched {
+ assert.NotEmpty(t,
msg.Properties[BatchHeader])
+ assert.Equal(t,
strconv.Itoa(batchSize), msg.Properties[BatchHeader])
+ }
+ }
+ })
}
}
-
func TestGetMessageByID(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName