This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new cf832bb6 [Issue #1418] Fix: Properties not getting consistently set on 
pulsaradmin subscription message responses (#1419)
cf832bb6 is described below

commit cf832bb6faa6c1aa18b38ab0505fe81682289ec6
Author: JamesMurkin <[email protected]>
AuthorDate: Mon Sep 8 11:15:33 2025 +0100

    [Issue #1418] Fix: Properties not getting consistently set on pulsaradmin 
subscription message responses (#1419)
    
    ### Motivation
    
    The properties on messages returned by PeekMessages is not returned 
consistently as described in #1418
    
    This change makes sure we set all properties from headers before continuing 
on, so the returned message should consistently contain all properties.
    
    Currently the properties returned is inconsistent on repeated calls, even 
if the message returned is the same each time.
    
    ### Modifications
    
    I've made it so all headers are processed / al properties are set on every 
message, rather than exiting the loop early which can cause inconsistent 
results.
---
 pulsaradmin/pkg/admin/subscription.go      |   6 +-
 pulsaradmin/pkg/admin/subscription_test.go | 147 +++++++++++++++++++----------
 2 files changed, 104 insertions(+), 49 deletions(-)

diff --git a/pulsaradmin/pkg/admin/subscription.go 
b/pulsaradmin/pkg/admin/subscription.go
index a1a13619..2dfa28d6 100644
--- a/pulsaradmin/pkg/admin/subscription.go
+++ b/pulsaradmin/pkg/admin/subscription.go
@@ -256,6 +256,7 @@ func handleResp(topic utils.TopicName, resp *http.Response) 
([]*utils.Message, e
        }
 
        properties := make(map[string]string)
+       isBatch := false
        for k := range resp.Header {
                switch {
                case k == PublishTimeHeader:
@@ -268,7 +269,7 @@ func handleResp(topic utils.TopicName, resp *http.Response) 
([]*utils.Message, e
                        if h != "" {
                                properties[BatchHeader] = h
                        }
-                       return getIndividualMsgsFromBatch(topic, ID, payload, 
properties)
+                       isBatch = true
                case k == PropertyHeader:
                        propJSON := resp.Header.Get(k)
                        if err := json.Unmarshal([]byte(propJSON), 
&properties); err != nil {
@@ -280,6 +281,9 @@ func handleResp(topic utils.TopicName, resp *http.Response) 
([]*utils.Message, e
                }
        }
 
+       if isBatch {
+               return getIndividualMsgsFromBatch(topic, ID, payload, 
properties)
+       }
        return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload, 
properties)}, nil
 }
 
diff --git a/pulsaradmin/pkg/admin/subscription_test.go 
b/pulsaradmin/pkg/admin/subscription_test.go
index 1e4a7b21..08d2cf22 100644
--- a/pulsaradmin/pkg/admin/subscription_test.go
+++ b/pulsaradmin/pkg/admin/subscription_test.go
@@ -20,6 +20,7 @@ package admin
 import (
        "context"
        "fmt"
+       "strconv"
        "sync"
        "testing"
        "time"
@@ -144,58 +145,108 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) {
        }
 }
 
-func TestPeekMessageWithProperties(t *testing.T) {
-       randomName := newTopicName()
-       topic := "persistent://public/default/" + randomName
-       topicName, _ := utils.GetTopicName(topic)
-       subName := "test-sub"
-
-       cfg := &config.Config{}
-       admin, err := New(cfg)
-       assert.NoError(t, err)
-       assert.NotNil(t, admin)
-
-       client, err := pulsar.NewClient(pulsar.ClientOptions{
-               URL: lookupURL,
-       })
-       assert.NoError(t, err)
-       defer client.Close()
-
-       // Create a producer for non-batch messages
-       producer, err := client.CreateProducer(pulsar.ProducerOptions{
-               Topic:           topic,
-               DisableBatching: true,
-       })
-       assert.NoError(t, err)
-       defer producer.Close()
-
-       props := map[string]string{
-               "key1":        "value1",
-               "KEY2":        "VALUE2",
-               "KeY3":        "VaLuE3",
-               "details=man": "good at playing basketball",
+func TestPeekMessagesWithProperties(t *testing.T) {
+       tests := map[string]struct {
+               batched bool
+       }{
+               "non-batched": {
+                       batched: false,
+               },
+               "batched": {
+                       batched: true,
+               },
        }
 
-       _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
-               Payload:    []byte("test-message"),
-               Properties: props,
-       })
-       assert.NoError(t, err)
-
-       // Peek messages
-       messages, err := admin.Subscriptions().PeekMessages(*topicName, 
subName, 1)
-       assert.NoError(t, err)
-       assert.NotNil(t, messages)
-
-       // Verify properties of messages
-       for _, msg := range messages {
-               assert.Equal(t, "value1", msg.Properties["key1"])
-               assert.Equal(t, "VALUE2", msg.Properties["KEY2"])
-               assert.Equal(t, "VaLuE3", msg.Properties["KeY3"])
-               assert.Equal(t, "good at playing basketball", 
msg.Properties["details=man"])
+       for name, tc := range tests {
+               t.Run(name, func(t *testing.T) {
+                       ctx := context.Background()
+                       randomName := newTopicName()
+                       topic := "persistent://public/default/" + randomName
+                       topicName, _ := utils.GetTopicName(topic)
+                       subName := "test-sub"
+
+                       cfg := &config.Config{}
+                       admin, err := New(cfg)
+                       assert.NoError(t, err)
+                       assert.NotNil(t, admin)
+
+                       client, err := pulsar.NewClient(pulsar.ClientOptions{
+                               URL: lookupURL,
+                       })
+                       assert.NoError(t, err)
+                       defer client.Close()
+
+                       var producer pulsar.Producer
+                       batchSize := 5
+                       if tc.batched {
+                               producer, err = 
client.CreateProducer(pulsar.ProducerOptions{
+                                       Topic:                   topic,
+                                       DisableBatching:         false,
+                                       BatchingMaxMessages:     
uint(batchSize),
+                                       BatchingMaxPublishDelay: time.Second * 
2,
+                               })
+                               assert.NoError(t, err)
+                               defer producer.Close()
+                       } else {
+                               producer, err = 
client.CreateProducer(pulsar.ProducerOptions{
+                                       Topic:           topic,
+                                       DisableBatching: true,
+                               })
+                               assert.NoError(t, err)
+                               defer producer.Close()
+                       }
+
+                       props := map[string]string{
+                               "key1":        "value1",
+                               "KEY2":        "VALUE2",
+                               "KeY3":        "VaLuE3",
+                               "details=man": "good at playing basketball",
+                       }
+
+                       var wg sync.WaitGroup
+                       numberOfMessagesToWaitFor := 10
+                       numberOfMessagesToSend := numberOfMessagesToWaitFor
+                       if tc.batched {
+                               // If batched send one extra message to cause 
the batch to be sent immediately
+                               numberOfMessagesToSend++
+                       }
+                       wg.Add(numberOfMessagesToWaitFor)
+
+                       for i := 0; i < numberOfMessagesToSend; i++ {
+                               producer.SendAsync(ctx, &pulsar.ProducerMessage{
+                                       Payload:    []byte("test-message"),
+                                       Properties: props,
+                               }, func(_ pulsar.MessageID, _ 
*pulsar.ProducerMessage, err error) {
+                                       assert.Nil(t, err)
+                                       if i < numberOfMessagesToWaitFor {
+                                               wg.Done()
+                                       }
+                               })
+                       }
+                       wg.Wait()
+
+                       // Peek messages
+                       messages, err := 
admin.Subscriptions().PeekMessages(*topicName, subName, batchSize)
+                       assert.NoError(t, err)
+                       assert.NotNil(t, messages)
+                       assert.Len(t, messages, batchSize)
+
+                       // Verify properties of messages
+                       for _, msg := range messages {
+                               assert.Equal(t, "value1", 
msg.Properties["key1"])
+                               assert.Equal(t, "VALUE2", 
msg.Properties["KEY2"])
+                               assert.Equal(t, "VaLuE3", 
msg.Properties["KeY3"])
+                               assert.Equal(t, "good at playing basketball", 
msg.Properties["details=man"])
+                               // Standard pulsar properties, set by pulsar
+                               assert.NotEmpty(t, 
msg.Properties["publish-time"])
+                               if tc.batched {
+                                       assert.NotEmpty(t, 
msg.Properties[BatchHeader])
+                                       assert.Equal(t, 
strconv.Itoa(batchSize), msg.Properties[BatchHeader])
+                               }
+                       }
+               })
        }
 }
-
 func TestGetMessageByID(t *testing.T) {
        randomName := newTopicName()
        topic := "persistent://public/default/" + randomName

Reply via email to