This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push: new ee5a175 fix PullThresholdForQueue and PullThresholdSizeForQueue update (#1140) ee5a175 is described below commit ee5a175188990c0a2fda45adb2e03264a368bf03 Author: wenxuwan <wangwenxue....@alibaba-inc.com> AuthorDate: Tue Apr 23 17:14:29 2024 +0800 fix PullThresholdForQueue and PullThresholdSizeForQueue update (#1140) * seperate interface and implement * fix panic when close tracedispatcher --- consumer/option.go | 12 ++++++------ consumer/push_consumer.go | 32 ++++++++++++++++---------------- internal/utils/compression.go | 1 + 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/consumer/option.go b/consumer/option.go index 2e08163..608917d 100644 --- a/consumer/option.go +++ b/consumer/option.go @@ -45,15 +45,15 @@ type consumerOptions struct { // Concurrently max span offset.it has no effect on sequential consumption ConsumeConcurrentlyMaxSpan int - // Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, + // Flow control threshold on queue level, each message queue will cache at most 1024 messages by default, // Consider the {PullBatchSize}, the instantaneous value may exceed the limit - PullThresholdForQueue int64 + PullThresholdForQueue atomic.Int64 - // Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default, + // Limit the cached message size on queue level, each message queue will cache at most 512 MiB messages by default, // Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit // // The size of a message only measured by message body, so it's not accurate - PullThresholdSizeForQueue int + PullThresholdSizeForQueue atomic.Int32 // Flow control threshold on topic level, default value is -1(Unlimited) // @@ -198,13 +198,13 @@ func WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option { func WithPullThresholdForQueue(pullThresholdForQueue int64) Option { return func(options *consumerOptions) { - options.PullThresholdForQueue = pullThresholdForQueue + options.PullThresholdForQueue.Store(pullThresholdForQueue) } } func WithPullThresholdSizeForQueue(pullThresholdSizeForQueue int) Option { return func(options *consumerOptions) { - options.PullThresholdSizeForQueue = pullThresholdSizeForQueue + options.PullThresholdSizeForQueue.Store(int32(pullThresholdSizeForQueue)) } } diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index c600f13..259980a 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -518,11 +518,11 @@ func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*pr if newVal == 0 { newVal = 1 } - rlog.Info("The PullThresholdForTopic is changed", map[string]interface{}{ - rlog.LogKeyValueChangedFrom: pc.option.PullThresholdForTopic, + rlog.Info("The PullThresholdForQueue is changed", map[string]interface{}{ + rlog.LogKeyValueChangedFrom: pc.option.PullThresholdForQueue.Load(), rlog.LogKeyValueChangedTo: newVal, }) - pc.option.PullThresholdForTopic = newVal + pc.option.PullThresholdForQueue.Store(int64(newVal)) } if pc.option.PullThresholdSizeForTopic != -1 { @@ -530,11 +530,11 @@ func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*pr if newVal == 0 { newVal = 1 } - rlog.Info("The PullThresholdSizeForTopic is changed", map[string]interface{}{ - rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForTopic, + rlog.Info("The PullThresholdSizeForQueue is changed", map[string]interface{}{ + rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForQueue.Load(), rlog.LogKeyValueChangedTo: newVal, }) - pc.option.PullThresholdSizeForTopic = newVal + pc.option.PullThresholdSizeForQueue.Store(int32(newVal)) } } pc.client.SendHeartbeatToAllBrokerWithLock() @@ -564,9 +564,9 @@ func (pc *pushConsumer) validate() error { } } - if pc.option.PullThresholdForQueue < 1 || pc.option.PullThresholdForQueue > 65535 { - if pc.option.PullThresholdForQueue == 0 { - pc.option.PullThresholdForQueue = 1024 + if pc.option.PullThresholdForQueue.Load() < 1 || pc.option.PullThresholdForQueue.Load() > 65535 { + if pc.option.PullThresholdForQueue.Load() == 0 { + pc.option.PullThresholdForQueue.Store(1024) } else { return errors.New("option.PullThresholdForQueue out of range [1, 65535]") } @@ -580,9 +580,9 @@ func (pc *pushConsumer) validate() error { } } - if pc.option.PullThresholdSizeForQueue < 1 || pc.option.PullThresholdSizeForQueue > 1024 { - if pc.option.PullThresholdSizeForQueue == 0 { - pc.option.PullThresholdSizeForQueue = 512 + if pc.option.PullThresholdSizeForQueue.Load() < 1 || pc.option.PullThresholdSizeForQueue.Load() > 1024 { + if pc.option.PullThresholdSizeForQueue.Load() == 0 { + pc.option.PullThresholdSizeForQueue.Store(512) } else { return errors.New("option.PullThresholdSizeForQueue out of range [1, 1024]") } @@ -693,10 +693,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { } cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb) - if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue { + if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue.Load() { if pc.queueFlowControlTimes%1000 == 0 { rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{ - "PullThresholdForQueue": pc.option.PullThresholdForQueue, + "PullThresholdForQueue": pc.option.PullThresholdForQueue.Load(), "minOffset": pq.Min(), "maxOffset": pq.Max(), "count": pq.cachedMsgCount, @@ -710,10 +710,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { goto NEXT } - if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue { + if cachedMessageSizeInMiB > int(pc.option.PullThresholdSizeForQueue.Load()) { if pc.queueFlowControlTimes%1000 == 0 { rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{ - "PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue, + "PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue.Load(), "minOffset": pq.Min(), "maxOffset": pq.Max(), "count": pq.cachedMsgCount, diff --git a/internal/utils/compression.go b/internal/utils/compression.go index 162864f..11f1791 100644 --- a/internal/utils/compression.go +++ b/internal/utils/compression.go @@ -78,6 +78,7 @@ func UnCompress(data []byte) []byte { if err != nil { return data } + defer r.Close() retData, err := ioutil.ReadAll(r) if err != nil { return data