This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d471a678 Change the pulsar_client_sending_buffers_count metric to
client level (#1408)
d471a678 is described below
commit d471a678ea54f1deaa016f09c377e5e41eb22a51
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Aug 21 11:13:17 2025 +0800
Change the pulsar_client_sending_buffers_count metric to client level
(#1408)
#1394 introduces the `pulsar_client_sending_buffers_count` metric to track
how many buffers are allocated for send purpose and not put back to the pool.
However, unlike other metrics, this metric is not client level, so it cannot be
attached with `CustomMetricsLabels` in client options.
When a send buffer was not put back to the pool, it means the `Release`
method is not called due to some reason. Changing this metric to client level
could help locate which client has triggered this bug in an application that
has many client instances from different businesses.
Here is an example metric when I configured `CustomMetricsLabels:
map[string]string{"key": "value"}` after this change
```
pulsar_client_sending_buffers_count{client="go",key="value"} 1
```
---
pulsar/internal/batch_builder.go | 13 +++++---
pulsar/internal/buffer.go | 23 ++++++++++++--
pulsar/internal/key_based_batch_builder.go | 6 ++--
pulsar/internal/key_based_batch_builder_test.go | 2 ++
pulsar/internal/metrics.go | 42 ++++++++++---------------
pulsar/producer_partition.go | 4 +++
6 files changed, 56 insertions(+), 34 deletions(-)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index c1cd30fb..ee5c1299 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -33,7 +33,7 @@ import (
type BatcherBuilderProvider func(
maxMessages uint, maxBatchSize uint, maxMessageSize uint32,
producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
- bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
+ bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor
crypto.Encryptor,
) (BatchBuilder, error)
// BatchBuilder is a interface of batch builders
@@ -100,6 +100,7 @@ type batchContainer struct {
compressionProvider compression.Provider
buffersPool BuffersPool
+ metrics *Metrics
log log.Logger
@@ -110,7 +111,7 @@ type batchContainer struct {
func newBatchContainer(
maxMessages uint, maxBatchSize uint, maxMessageSize uint32,
producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
- bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
+ bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor
crypto.Encryptor,
) batchContainer {
bc := batchContainer{
@@ -133,6 +134,7 @@ func newBatchContainer(
callbacks: []interface{}{},
compressionProvider: GetCompressionProvider(compressionType,
level),
buffersPool: bufferPool,
+ metrics: metrics,
log: logger,
encryptor: encryptor,
}
@@ -148,12 +150,12 @@ func newBatchContainer(
func NewBatchBuilder(
maxMessages uint, maxBatchSize uint, maxMessageSize uint32,
producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
- bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
+ bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor
crypto.Encryptor,
) (BatchBuilder, error) {
bc := newBatchContainer(
maxMessages, maxBatchSize, maxMessageSize, producerName,
producerID, compressionType,
- level, bufferPool, logger, encryptor,
+ level, bufferPool, metrics, logger, encryptor,
)
return &bc, nil
@@ -266,6 +268,9 @@ func (bc *batchContainer) Flush() *FlushBatch {
bc.msgMetadata.UncompressedSize = &uncompressedSize
buffer := bc.buffersPool.GetBuffer(int(uncompressedSize * 3 / 2))
+ bufferCount := bc.metrics.SendingBuffersCount
+ bufferCount.Inc()
+ buffer.SetReleaseCallback(func() { bufferCount.Dec() })
sequenceID := uint64(0)
var err error
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index af6676d7..8feb3de4 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -76,9 +76,15 @@ type Buffer interface {
Resize(newSize uint32)
ResizeIfNeeded(spaceNeeded uint32)
+ // Retain increases the reference count
Retain()
+ // Release decreases the reference count and returns the buffer to the
pool
+ // if it's associated with a buffer pool and the count reaches zero.
Release()
+ // RefCnt returns the current reference count of the buffer.
RefCnt() int64
+ // SetReleaseCallback sets a callback function that will be called when
the buffer is returned to a pool.
+ SetReleaseCallback(cb func())
// Clear will clear the current buffer data.
Clear()
@@ -95,7 +101,6 @@ func NewBufferPool() BuffersPool {
}
func (p *bufferPoolImpl) GetBuffer(initSize int) Buffer {
- sendingBuffersCount.Inc()
b, ok := p.Get().(*buffer)
if ok {
b.Clear()
@@ -112,9 +117,14 @@ func (p *bufferPoolImpl) GetBuffer(initSize int) Buffer {
}
func (p *bufferPoolImpl) Put(buf Buffer) {
- sendingBuffersCount.Dec()
if b, ok := buf.(*buffer); ok {
+ // Get the callback before putting back to the pool because it
might be reset after the
+ // buffer is returned to the pool and reused in GetBuffer.
+ cb := b.releaseCallback
p.Pool.Put(b)
+ if cb != nil {
+ cb()
+ }
}
}
@@ -126,6 +136,11 @@ type buffer struct {
refCnt atomic.Int64
pool BuffersPool
+
+ // releaseCallback is an optional function that is called when the
buffer is released back to the pool.
+ // It allows custom cleanup or notification logic to be executed after
the buffer is returned.
+ // The callback is invoked in bufferPoolImpl.Put, after the buffer is
put back into the pool.
+ releaseCallback func()
}
// NewBuffer creates and initializes a new Buffer using buf as its initial
contents.
@@ -277,6 +292,10 @@ func (b *buffer) RefCnt() int64 {
return b.refCnt.Load()
}
+func (b *buffer) SetReleaseCallback(cb func()) {
+ b.releaseCallback = cb
+}
+
func (b *buffer) Clear() {
b.readerIdx = 0
b.writerIdx = 0
diff --git a/pulsar/internal/key_based_batch_builder.go
b/pulsar/internal/key_based_batch_builder.go
index 5dfcb0d8..d8083f08 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -86,14 +86,14 @@ func (h *keyBasedBatches) Val(key string) *batchContainer {
func NewKeyBasedBatchBuilder(
maxMessages uint, maxBatchSize uint, maxMessageSize uint32,
producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
- bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
+ bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor
crypto.Encryptor,
) (BatchBuilder, error) {
bb := &keyBasedBatchContainer{
batches: newKeyBasedBatches(),
batchContainer: newBatchContainer(
maxMessages, maxBatchSize, maxMessageSize,
producerName, producerID,
- compressionType, level, bufferPool, logger, encryptor,
+ compressionType, level, bufferPool, metrics, logger,
encryptor,
),
compressionType: compressionType,
level: level,
@@ -155,7 +155,7 @@ func (bc *keyBasedBatchContainer) Add(
// create batchContainer for new key
t := newBatchContainer(
bc.maxMessages, bc.maxBatchSize, bc.maxMessageSize,
bc.producerName, bc.producerID,
- bc.compressionType, bc.level, bc.buffersPool, bc.log,
bc.encryptor,
+ bc.compressionType, bc.level, bc.buffersPool,
bc.metrics, bc.log, bc.encryptor,
)
batchPart = &t
bc.batches.Add(msgKey, &t)
diff --git a/pulsar/internal/key_based_batch_builder_test.go
b/pulsar/internal/key_based_batch_builder_test.go
index 864f4777..10092327 100644
--- a/pulsar/internal/key_based_batch_builder_test.go
+++ b/pulsar/internal/key_based_batch_builder_test.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
@@ -47,6 +48,7 @@ func TestKeyBasedBatcherOrdering(t *testing.T) {
pb.CompressionType_NONE,
compression.Level(0),
&bufferPoolImpl{},
+ NewMetricsProvider(2, map[string]string{},
prometheus.DefaultRegisterer),
log.NewLoggerWithLogrus(logrus.StandardLogger()),
&mockEncryptor{},
)
diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index f7374953..cc7a38d5 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -21,27 +21,16 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
-var (
- defaultConstLabels = map[string]string{
- "client": "go",
- }
-
- sendingBuffersCount = prometheus.NewGauge(prometheus.GaugeOpts{
- Name: "pulsar_client_sending_buffers_count",
- Help: "Number of sending buffers",
- ConstLabels: defaultConstLabels,
- })
-)
-
type Metrics struct {
- metricsLevel int
- messagesPublished *prometheus.CounterVec
- bytesPublished *prometheus.CounterVec
- messagesPending *prometheus.GaugeVec
- bytesPending *prometheus.GaugeVec
- publishErrors *prometheus.CounterVec
- publishLatency *prometheus.HistogramVec
- publishRPCLatency *prometheus.HistogramVec
+ metricsLevel int
+ messagesPublished *prometheus.CounterVec
+ bytesPublished *prometheus.CounterVec
+ messagesPending *prometheus.GaugeVec
+ bytesPending *prometheus.GaugeVec
+ publishErrors *prometheus.CounterVec
+ publishLatency *prometheus.HistogramVec
+ publishRPCLatency *prometheus.HistogramVec
+ SendingBuffersCount prometheus.Gauge
messagesReceived *prometheus.CounterVec
bytesReceived *prometheus.CounterVec
@@ -111,10 +100,7 @@ type LeveledMetrics struct {
// NewMetricsProvider returns metrics registered to registerer.
func NewMetricsProvider(metricsCardinality int, userDefinedLabels
map[string]string,
registerer prometheus.Registerer) *Metrics {
- constLabels := make(map[string]string)
- for k, v := range defaultConstLabels {
- constLabels[k] = v
- }
+ constLabels := map[string]string{"client": "go"}
for k, v := range userDefinedLabels {
constLabels[k] = v
}
@@ -180,6 +166,12 @@ func NewMetricsProvider(metricsCardinality int,
userDefinedLabels map[string]str
Buckets: []float64{.0005, .001, .005, .01, .025,
.05, .1, .25, .5, 1, 2.5, 5, 10},
}, metricsLevelLabels),
+ SendingBuffersCount: prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: "pulsar_client_sending_buffers_count",
+ Help: "Number of sending buffers",
+ ConstLabels: constLabels,
+ }),
+
producersOpened:
prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producers_opened",
Help: "Counter of producers created by the
client",
@@ -548,7 +540,7 @@ func NewMetricsProvider(metricsCardinality int,
userDefinedLabels map[string]str
metrics.RPCRequestCount =
are.ExistingCollector.(prometheus.Counter)
}
}
- _ = registerer.Register(sendingBuffersCount)
+ _ = registerer.Register(metrics.SendingBuffersCount)
return metrics
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f5df0657..1d65ec87 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -372,6 +372,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL
string) error {
maxMessageSize, p.producerName, p.producerID,
pb.CompressionType(p.options.CompressionType),
compression.Level(p.options.CompressionLevel),
buffersPool,
+ p.client.metrics,
p.log,
p.encryptor)
if err != nil {
@@ -798,6 +799,9 @@ func (p *partitionProducer) internalSingleSend(
payloadBuf.Write(compressedPayload)
buffer := buffersPool.GetBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
+ bufferCount := p.client.metrics.SendingBuffersCount
+ bufferCount.Inc()
+ buffer.SetReleaseCallback(func() { bufferCount.Dec() })
sid := *mm.SequenceId
var useTxn bool