This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d471a678 Change the pulsar_client_sending_buffers_count metric to 
client level (#1408)
d471a678 is described below

commit d471a678ea54f1deaa016f09c377e5e41eb22a51
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Aug 21 11:13:17 2025 +0800

    Change the pulsar_client_sending_buffers_count metric to client level 
(#1408)
    
    #1394 introduces the `pulsar_client_sending_buffers_count` metric to track 
how many buffers are allocated for send purpose and not put back to the pool. 
However, unlike other metrics, this metric is not client level, so it cannot be 
attached with `CustomMetricsLabels` in client options.
    
    When a send buffer was not put back to the pool, it means the `Release` 
method is not called due to some reason. Changing this metric to client level 
could help locate which client has triggered this bug in an application that 
has many client instances from different businesses.
    
    Here is an example metric when I configured `CustomMetricsLabels: 
map[string]string{"key": "value"}` after this change
    
    ```
    pulsar_client_sending_buffers_count{client="go",key="value"} 1
    ```
---
 pulsar/internal/batch_builder.go                | 13 +++++---
 pulsar/internal/buffer.go                       | 23 ++++++++++++--
 pulsar/internal/key_based_batch_builder.go      |  6 ++--
 pulsar/internal/key_based_batch_builder_test.go |  2 ++
 pulsar/internal/metrics.go                      | 42 ++++++++++---------------
 pulsar/producer_partition.go                    |  4 +++
 6 files changed, 56 insertions(+), 34 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index c1cd30fb..ee5c1299 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -33,7 +33,7 @@ import (
 type BatcherBuilderProvider func(
        maxMessages uint, maxBatchSize uint, maxMessageSize uint32, 
producerName string, producerID uint64,
        compressionType pb.CompressionType, level compression.Level,
-       bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
+       bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor 
crypto.Encryptor,
 ) (BatchBuilder, error)
 
 // BatchBuilder is a interface of batch builders
@@ -100,6 +100,7 @@ type batchContainer struct {
 
        compressionProvider compression.Provider
        buffersPool         BuffersPool
+       metrics             *Metrics
 
        log log.Logger
 
@@ -110,7 +111,7 @@ type batchContainer struct {
 func newBatchContainer(
        maxMessages uint, maxBatchSize uint, maxMessageSize uint32, 
producerName string, producerID uint64,
        compressionType pb.CompressionType, level compression.Level,
-       bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
+       bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor 
crypto.Encryptor,
 ) batchContainer {
 
        bc := batchContainer{
@@ -133,6 +134,7 @@ func newBatchContainer(
                callbacks:           []interface{}{},
                compressionProvider: GetCompressionProvider(compressionType, 
level),
                buffersPool:         bufferPool,
+               metrics:             metrics,
                log:                 logger,
                encryptor:           encryptor,
        }
@@ -148,12 +150,12 @@ func newBatchContainer(
 func NewBatchBuilder(
        maxMessages uint, maxBatchSize uint, maxMessageSize uint32, 
producerName string, producerID uint64,
        compressionType pb.CompressionType, level compression.Level,
-       bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
+       bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor 
crypto.Encryptor,
 ) (BatchBuilder, error) {
 
        bc := newBatchContainer(
                maxMessages, maxBatchSize, maxMessageSize, producerName, 
producerID, compressionType,
-               level, bufferPool, logger, encryptor,
+               level, bufferPool, metrics, logger, encryptor,
        )
 
        return &bc, nil
@@ -266,6 +268,9 @@ func (bc *batchContainer) Flush() *FlushBatch {
        bc.msgMetadata.UncompressedSize = &uncompressedSize
 
        buffer := bc.buffersPool.GetBuffer(int(uncompressedSize * 3 / 2))
+       bufferCount := bc.metrics.SendingBuffersCount
+       bufferCount.Inc()
+       buffer.SetReleaseCallback(func() { bufferCount.Dec() })
 
        sequenceID := uint64(0)
        var err error
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index af6676d7..8feb3de4 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -76,9 +76,15 @@ type Buffer interface {
        Resize(newSize uint32)
        ResizeIfNeeded(spaceNeeded uint32)
 
+       // Retain increases the reference count
        Retain()
+       // Release decreases the reference count and returns the buffer to the 
pool
+       // if it's associated with a buffer pool and the count reaches zero.
        Release()
+       // RefCnt returns the current reference count of the buffer.
        RefCnt() int64
+       // SetReleaseCallback sets a callback function that will be called when 
the buffer is returned to a pool.
+       SetReleaseCallback(cb func())
 
        // Clear will clear the current buffer data.
        Clear()
@@ -95,7 +101,6 @@ func NewBufferPool() BuffersPool {
 }
 
 func (p *bufferPoolImpl) GetBuffer(initSize int) Buffer {
-       sendingBuffersCount.Inc()
        b, ok := p.Get().(*buffer)
        if ok {
                b.Clear()
@@ -112,9 +117,14 @@ func (p *bufferPoolImpl) GetBuffer(initSize int) Buffer {
 }
 
 func (p *bufferPoolImpl) Put(buf Buffer) {
-       sendingBuffersCount.Dec()
        if b, ok := buf.(*buffer); ok {
+               // Get the callback before putting back to the pool because it 
might be reset after the
+               // buffer is returned to the pool and reused in GetBuffer.
+               cb := b.releaseCallback
                p.Pool.Put(b)
+               if cb != nil {
+                       cb()
+               }
        }
 }
 
@@ -126,6 +136,11 @@ type buffer struct {
 
        refCnt atomic.Int64
        pool   BuffersPool
+
+       // releaseCallback is an optional function that is called when the 
buffer is released back to the pool.
+       // It allows custom cleanup or notification logic to be executed after 
the buffer is returned.
+       // The callback is invoked in bufferPoolImpl.Put, after the buffer is 
put back into the pool.
+       releaseCallback func()
 }
 
 // NewBuffer creates and initializes a new Buffer using buf as its initial 
contents.
@@ -277,6 +292,10 @@ func (b *buffer) RefCnt() int64 {
        return b.refCnt.Load()
 }
 
+func (b *buffer) SetReleaseCallback(cb func()) {
+       b.releaseCallback = cb
+}
+
 func (b *buffer) Clear() {
        b.readerIdx = 0
        b.writerIdx = 0
diff --git a/pulsar/internal/key_based_batch_builder.go 
b/pulsar/internal/key_based_batch_builder.go
index 5dfcb0d8..d8083f08 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -86,14 +86,14 @@ func (h *keyBasedBatches) Val(key string) *batchContainer {
 func NewKeyBasedBatchBuilder(
        maxMessages uint, maxBatchSize uint, maxMessageSize uint32, 
producerName string, producerID uint64,
        compressionType pb.CompressionType, level compression.Level,
-       bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
+       bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor 
crypto.Encryptor,
 ) (BatchBuilder, error) {
 
        bb := &keyBasedBatchContainer{
                batches: newKeyBasedBatches(),
                batchContainer: newBatchContainer(
                        maxMessages, maxBatchSize, maxMessageSize, 
producerName, producerID,
-                       compressionType, level, bufferPool, logger, encryptor,
+                       compressionType, level, bufferPool, metrics, logger, 
encryptor,
                ),
                compressionType: compressionType,
                level:           level,
@@ -155,7 +155,7 @@ func (bc *keyBasedBatchContainer) Add(
                // create batchContainer for new key
                t := newBatchContainer(
                        bc.maxMessages, bc.maxBatchSize, bc.maxMessageSize, 
bc.producerName, bc.producerID,
-                       bc.compressionType, bc.level, bc.buffersPool, bc.log, 
bc.encryptor,
+                       bc.compressionType, bc.level, bc.buffersPool, 
bc.metrics, bc.log, bc.encryptor,
                )
                batchPart = &t
                bc.batches.Add(msgKey, &t)
diff --git a/pulsar/internal/key_based_batch_builder_test.go 
b/pulsar/internal/key_based_batch_builder_test.go
index 864f4777..10092327 100644
--- a/pulsar/internal/key_based_batch_builder_test.go
+++ b/pulsar/internal/key_based_batch_builder_test.go
@@ -25,6 +25,7 @@ import (
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/apache/pulsar-client-go/pulsar/log"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        "github.com/stretchr/testify/assert"
        "google.golang.org/protobuf/proto"
@@ -47,6 +48,7 @@ func TestKeyBasedBatcherOrdering(t *testing.T) {
                pb.CompressionType_NONE,
                compression.Level(0),
                &bufferPoolImpl{},
+               NewMetricsProvider(2, map[string]string{}, 
prometheus.DefaultRegisterer),
                log.NewLoggerWithLogrus(logrus.StandardLogger()),
                &mockEncryptor{},
        )
diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index f7374953..cc7a38d5 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -21,27 +21,16 @@ import (
        "github.com/prometheus/client_golang/prometheus"
 )
 
-var (
-       defaultConstLabels = map[string]string{
-               "client": "go",
-       }
-
-       sendingBuffersCount = prometheus.NewGauge(prometheus.GaugeOpts{
-               Name:        "pulsar_client_sending_buffers_count",
-               Help:        "Number of sending buffers",
-               ConstLabels: defaultConstLabels,
-       })
-)
-
 type Metrics struct {
-       metricsLevel      int
-       messagesPublished *prometheus.CounterVec
-       bytesPublished    *prometheus.CounterVec
-       messagesPending   *prometheus.GaugeVec
-       bytesPending      *prometheus.GaugeVec
-       publishErrors     *prometheus.CounterVec
-       publishLatency    *prometheus.HistogramVec
-       publishRPCLatency *prometheus.HistogramVec
+       metricsLevel        int
+       messagesPublished   *prometheus.CounterVec
+       bytesPublished      *prometheus.CounterVec
+       messagesPending     *prometheus.GaugeVec
+       bytesPending        *prometheus.GaugeVec
+       publishErrors       *prometheus.CounterVec
+       publishLatency      *prometheus.HistogramVec
+       publishRPCLatency   *prometheus.HistogramVec
+       SendingBuffersCount prometheus.Gauge
 
        messagesReceived   *prometheus.CounterVec
        bytesReceived      *prometheus.CounterVec
@@ -111,10 +100,7 @@ type LeveledMetrics struct {
 // NewMetricsProvider returns metrics registered to registerer.
 func NewMetricsProvider(metricsCardinality int, userDefinedLabels 
map[string]string,
        registerer prometheus.Registerer) *Metrics {
-       constLabels := make(map[string]string)
-       for k, v := range defaultConstLabels {
-               constLabels[k] = v
-       }
+       constLabels := map[string]string{"client": "go"}
        for k, v := range userDefinedLabels {
                constLabels[k] = v
        }
@@ -180,6 +166,12 @@ func NewMetricsProvider(metricsCardinality int, 
userDefinedLabels map[string]str
                        Buckets:     []float64{.0005, .001, .005, .01, .025, 
.05, .1, .25, .5, 1, 2.5, 5, 10},
                }, metricsLevelLabels),
 
+               SendingBuffersCount: prometheus.NewGauge(prometheus.GaugeOpts{
+                       Name:        "pulsar_client_sending_buffers_count",
+                       Help:        "Number of sending buffers",
+                       ConstLabels: constLabels,
+               }),
+
                producersOpened: 
prometheus.NewCounterVec(prometheus.CounterOpts{
                        Name:        "pulsar_client_producers_opened",
                        Help:        "Counter of producers created by the 
client",
@@ -548,7 +540,7 @@ func NewMetricsProvider(metricsCardinality int, 
userDefinedLabels map[string]str
                        metrics.RPCRequestCount = 
are.ExistingCollector.(prometheus.Counter)
                }
        }
-       _ = registerer.Register(sendingBuffersCount)
+       _ = registerer.Register(metrics.SendingBuffersCount)
        return metrics
 }
 
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f5df0657..1d65ec87 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -372,6 +372,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL 
string) error {
                        maxMessageSize, p.producerName, p.producerID, 
pb.CompressionType(p.options.CompressionType),
                        compression.Level(p.options.CompressionLevel),
                        buffersPool,
+                       p.client.metrics,
                        p.log,
                        p.encryptor)
                if err != nil {
@@ -798,6 +799,9 @@ func (p *partitionProducer) internalSingleSend(
        payloadBuf.Write(compressedPayload)
 
        buffer := buffersPool.GetBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
+       bufferCount := p.client.metrics.SendingBuffersCount
+       bufferCount.Inc()
+       buffer.SetReleaseCallback(func() { bufferCount.Dec() })
 
        sid := *mm.SequenceId
        var useTxn bool

Reply via email to