This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5a175  fix PullThresholdForQueue and PullThresholdSizeForQueue 
update (#1140)
ee5a175 is described below

commit ee5a175188990c0a2fda45adb2e03264a368bf03
Author: wenxuwan <wangwenxue....@alibaba-inc.com>
AuthorDate: Tue Apr 23 17:14:29 2024 +0800

    fix PullThresholdForQueue and PullThresholdSizeForQueue update (#1140)
    
    * seperate interface and implement
    
    * fix panic when close tracedispatcher
---
 consumer/option.go            | 12 ++++++------
 consumer/push_consumer.go     | 32 ++++++++++++++++----------------
 internal/utils/compression.go |  1 +
 3 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/consumer/option.go b/consumer/option.go
index 2e08163..608917d 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -45,15 +45,15 @@ type consumerOptions struct {
        // Concurrently max span offset.it has no effect on sequential 
consumption
        ConsumeConcurrentlyMaxSpan int
 
-       // Flow control threshold on queue level, each message queue will cache 
at most 1000 messages by default,
+       // Flow control threshold on queue level, each message queue will cache 
at most 1024 messages by default,
        // Consider the {PullBatchSize}, the instantaneous value may exceed the 
limit
-       PullThresholdForQueue int64
+       PullThresholdForQueue atomic.Int64
 
-       // Limit the cached message size on queue level, each message queue 
will cache at most 100 MiB messages by default,
+       // Limit the cached message size on queue level, each message queue 
will cache at most 512 MiB messages by default,
        // Consider the {@code pullBatchSize}, the instantaneous value may 
exceed the limit
        //
        // The size of a message only measured by message body, so it's not 
accurate
-       PullThresholdSizeForQueue int
+       PullThresholdSizeForQueue atomic.Int32
 
        // Flow control threshold on topic level, default value is -1(Unlimited)
        //
@@ -198,13 +198,13 @@ func 
WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option {
 
 func WithPullThresholdForQueue(pullThresholdForQueue int64) Option {
        return func(options *consumerOptions) {
-               options.PullThresholdForQueue = pullThresholdForQueue
+               options.PullThresholdForQueue.Store(pullThresholdForQueue)
        }
 }
 
 func WithPullThresholdSizeForQueue(pullThresholdSizeForQueue int) Option {
        return func(options *consumerOptions) {
-               options.PullThresholdSizeForQueue = pullThresholdSizeForQueue
+               
options.PullThresholdSizeForQueue.Store(int32(pullThresholdSizeForQueue))
        }
 }
 
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c600f13..259980a 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -518,11 +518,11 @@ func (pc *pushConsumer) messageQueueChanged(topic string, 
mqAll, mqDivided []*pr
                        if newVal == 0 {
                                newVal = 1
                        }
-                       rlog.Info("The PullThresholdForTopic is changed", 
map[string]interface{}{
-                               rlog.LogKeyValueChangedFrom: 
pc.option.PullThresholdForTopic,
+                       rlog.Info("The PullThresholdForQueue is changed", 
map[string]interface{}{
+                               rlog.LogKeyValueChangedFrom: 
pc.option.PullThresholdForQueue.Load(),
                                rlog.LogKeyValueChangedTo:   newVal,
                        })
-                       pc.option.PullThresholdForTopic = newVal
+                       pc.option.PullThresholdForQueue.Store(int64(newVal))
                }
 
                if pc.option.PullThresholdSizeForTopic != -1 {
@@ -530,11 +530,11 @@ func (pc *pushConsumer) messageQueueChanged(topic string, 
mqAll, mqDivided []*pr
                        if newVal == 0 {
                                newVal = 1
                        }
-                       rlog.Info("The PullThresholdSizeForTopic is changed", 
map[string]interface{}{
-                               rlog.LogKeyValueChangedFrom: 
pc.option.PullThresholdSizeForTopic,
+                       rlog.Info("The PullThresholdSizeForQueue is changed", 
map[string]interface{}{
+                               rlog.LogKeyValueChangedFrom: 
pc.option.PullThresholdSizeForQueue.Load(),
                                rlog.LogKeyValueChangedTo:   newVal,
                        })
-                       pc.option.PullThresholdSizeForTopic = newVal
+                       pc.option.PullThresholdSizeForQueue.Store(int32(newVal))
                }
        }
        pc.client.SendHeartbeatToAllBrokerWithLock()
@@ -564,9 +564,9 @@ func (pc *pushConsumer) validate() error {
                }
        }
 
-       if pc.option.PullThresholdForQueue < 1 || 
pc.option.PullThresholdForQueue > 65535 {
-               if pc.option.PullThresholdForQueue == 0 {
-                       pc.option.PullThresholdForQueue = 1024
+       if pc.option.PullThresholdForQueue.Load() < 1 || 
pc.option.PullThresholdForQueue.Load() > 65535 {
+               if pc.option.PullThresholdForQueue.Load() == 0 {
+                       pc.option.PullThresholdForQueue.Store(1024)
                } else {
                        return errors.New("option.PullThresholdForQueue out of 
range [1, 65535]")
                }
@@ -580,9 +580,9 @@ func (pc *pushConsumer) validate() error {
                }
        }
 
-       if pc.option.PullThresholdSizeForQueue < 1 || 
pc.option.PullThresholdSizeForQueue > 1024 {
-               if pc.option.PullThresholdSizeForQueue == 0 {
-                       pc.option.PullThresholdSizeForQueue = 512
+       if pc.option.PullThresholdSizeForQueue.Load() < 1 || 
pc.option.PullThresholdSizeForQueue.Load() > 1024 {
+               if pc.option.PullThresholdSizeForQueue.Load() == 0 {
+                       pc.option.PullThresholdSizeForQueue.Store(512)
                } else {
                        return errors.New("option.PullThresholdSizeForQueue out 
of range [1, 1024]")
                }
@@ -693,10 +693,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                }
 
                cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb)
-               if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue {
+               if pq.cachedMsgCount.Load() > 
pc.option.PullThresholdForQueue.Load() {
                        if pc.queueFlowControlTimes%1000 == 0 {
                                rlog.Warning("the cached message count exceeds 
the threshold, so do flow control", map[string]interface{}{
-                                       "PullThresholdForQueue": 
pc.option.PullThresholdForQueue,
+                                       "PullThresholdForQueue": 
pc.option.PullThresholdForQueue.Load(),
                                        "minOffset":             pq.Min(),
                                        "maxOffset":             pq.Max(),
                                        "count":                 
pq.cachedMsgCount,
@@ -710,10 +710,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                        goto NEXT
                }
 
-               if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue 
{
+               if cachedMessageSizeInMiB > 
int(pc.option.PullThresholdSizeForQueue.Load()) {
                        if pc.queueFlowControlTimes%1000 == 0 {
                                rlog.Warning("the cached message size exceeds 
the threshold, so do flow control", map[string]interface{}{
-                                       "PullThresholdSizeForQueue": 
pc.option.PullThresholdSizeForQueue,
+                                       "PullThresholdSizeForQueue": 
pc.option.PullThresholdSizeForQueue.Load(),
                                        "minOffset":                 pq.Min(),
                                        "maxOffset":                 pq.Max(),
                                        "count":                     
pq.cachedMsgCount,
diff --git a/internal/utils/compression.go b/internal/utils/compression.go
index 162864f..11f1791 100644
--- a/internal/utils/compression.go
+++ b/internal/utils/compression.go
@@ -78,6 +78,7 @@ func UnCompress(data []byte) []byte {
        if err != nil {
                return data
        }
+       defer r.Close()
        retData, err := ioutil.ReadAll(r)
        if err != nil {
                return data

Reply via email to