This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3493a47  feat: support consumer consume tps option (#1101)
3493a47 is described below

commit 3493a4783ab39d810cffc37757b5b005cfa57bd9
Author: WeizhongTu <tuweizh...@163.com>
AuthorDate: Mon Oct 16 14:36:08 2023 +0800

    feat: support consumer consume tps option (#1101)
    
    * feat: support consumer consume tps option
    
    * feat: support consumer consume tps option
---
 consumer/consumer.go      |  8 ++++++++
 consumer/option.go        | 28 ++++++++++++++++++++--------
 consumer/pull_consumer.go |  4 ++--
 consumer/push_consumer.go | 12 ++++++------
 4 files changed, 36 insertions(+), 16 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index acce804..4ed4940 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -414,6 +414,14 @@ func (dc *defaultConsumer) doBalance() {
                                return (mqAll[i].QueueId - mqAll[j].QueueId) < 0
                        })
                        allocateResult := dc.allocate(dc.consumerGroup, 
dc.client.ClientID(), mqAll, cidAll)
+
+                       // Principle of flow control: pull TPS = 
1000ms/PullInterval * BatchSize * len(allocateResult)
+                       if consumeTPS := dc.option.ConsumeTPS.Load(); 
consumeTPS > 0 && len(allocateResult) > 0 {
+                               pullBatchSize := dc.option.PullBatchSize.Load()
+                               pullTimesPerSecond := float64(consumeTPS) / 
float64(pullBatchSize*int32(len(allocateResult)))
+                               
dc.option.PullInterval.Store(time.Duration(float64(time.Second) / 
pullTimesPerSecond))
+                       }
+
                        changed := dc.updateProcessQueueTable(topic, 
allocateResult)
                        if changed {
                                dc.mqChanged(topic, mqAll, allocateResult)
diff --git a/consumer/option.go b/consumer/option.go
index 3f7e6ca..ac7dd93 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -21,6 +21,8 @@ import (
        "strings"
        "time"
 
+       "go.uber.org/atomic"
+
        "github.com/apache/rocketmq-client-go/v2/hooks"
        "github.com/apache/rocketmq-client-go/v2/internal"
        "github.com/apache/rocketmq-client-go/v2/primitive"
@@ -55,8 +57,8 @@ type consumerOptions struct {
 
        // Flow control threshold on topic level, default value is -1(Unlimited)
        //
-       // The value of {@code pullThresholdForQueue} will be overwrote and 
calculated based on
-       // {@code pullThresholdForTopic} if it is't unlimited
+       // The value of {@code pullThresholdForQueue} will be overwritten and 
calculated based on
+       // {@code pullThresholdForTopic} if it isn't unlimited
        //
        // For example, if the value of pullThresholdForTopic is 1000 and 10 
message queues are assigned to this consumer,
        // then pullThresholdForQueue will be set to 100
@@ -64,21 +66,24 @@ type consumerOptions struct {
 
        // Limit the cached message size on topic level, default value is -1 
MiB(Unlimited)
        //
-       // The value of {@code pullThresholdSizeForQueue} will be overwrote and 
calculated based on
-       // {@code pullThresholdSizeForTopic} if it is't unlimited
+       // The value of {@code pullThresholdSizeForQueue} will be overwritten 
and calculated based on
+       // {@code pullThresholdSizeForTopic} if it isn't unlimited
        //
        // For example, if the value of pullThresholdSizeForTopic is 1000 MiB 
and 10 message queues are
        // assigned to this consumer, then pullThresholdSizeForQueue will be 
set to 100 MiB
        PullThresholdSizeForTopic int
 
        // Message pull Interval
-       PullInterval time.Duration
+       PullInterval atomic.Duration
+
+       // Message consumer tps
+       ConsumeTPS atomic.Int32
 
        // Batch consumption size
        ConsumeMessageBatchMaxSize int
 
        // Batch pull size
-       PullBatchSize int32
+       PullBatchSize atomic.Int32
 
        // Whether update subscription relationship when every pull
        PostSubscriptionWhenPull bool
@@ -283,7 +288,7 @@ func WithStrategy(strategy AllocateStrategy) Option {
 
 func WithPullBatchSize(batchSize int32) Option {
        return func(options *consumerOptions) {
-               options.PullBatchSize = batchSize
+               options.PullBatchSize.Store(batchSize)
        }
 }
 
@@ -307,7 +312,14 @@ func WithSuspendCurrentQueueTimeMillis(suspendT 
time.Duration) Option {
 
 func WithPullInterval(interval time.Duration) Option {
        return func(options *consumerOptions) {
-               options.PullInterval = interval
+               options.PullInterval.Store(interval)
+       }
+}
+
+// WithConsumeTPS set single-machine consumption tps
+func WithConsumeTPS(tps int32) Option {
+       return func(options *consumerOptions) {
+               options.ConsumeTPS.Store(tps)
        }
 }
 
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 8af88f1..bd05cbf 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -700,7 +700,7 @@ func (pc *defaultPullConsumer) pullMessage(request 
*PullRequest) {
                        time.Sleep(sleepTime)
                }
                // reset time
-               sleepTime = pc.option.PullInterval
+               sleepTime = pc.option.PullInterval.Load()
                pq.lastPullTime.Store(time.Now())
                err := pc.makeSureStateOK()
                if err != nil {
@@ -736,7 +736,7 @@ func (pc *defaultPullConsumer) pullMessage(request 
*PullRequest) {
                        Topic:                request.mq.Topic,
                        QueueId:              int32(request.mq.QueueId),
                        QueueOffset:          request.nextOffset,
-                       MaxMsgNums:           pc.option.PullBatchSize,
+                       MaxMsgNums:           pc.option.PullBatchSize.Load(),
                        SysFlag:              sysFlag,
                        CommitOffset:         0,
                        SubExpression:        sd.SubString,
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 2799841..ab105b3 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -596,7 +596,7 @@ func (pc *pushConsumer) validate() error {
                }
        }
 
-       if pc.option.PullInterval < 0 || pc.option.PullInterval > 
65535*time.Millisecond {
+       if interval := pc.option.PullInterval.Load(); interval < 0 || interval 
> 65535*time.Millisecond {
                return errors.New("option.PullInterval out of range [0, 65535]")
        }
 
@@ -608,9 +608,9 @@ func (pc *pushConsumer) validate() error {
                }
        }
 
-       if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
-               if pc.option.PullBatchSize == 0 {
-                       pc.option.PullBatchSize = 32
+       if pullBatchSize := pc.option.PullBatchSize.Load(); pullBatchSize < 1 
|| pullBatchSize > 1024 {
+               if pullBatchSize == 0 {
+                       pc.option.PullBatchSize.Store(32)
                } else {
                        return errors.New("option.PullBatchSize out of range 
[1, 1024]")
                }
@@ -674,7 +674,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        time.Sleep(sleepTime)
                }
                // reset time
-               sleepTime = pc.option.PullInterval
+               sleepTime = pc.option.PullInterval.Load()
                pq.lastPullTime.Store(time.Now())
                err := pc.makeSureStateOK()
                if err != nil {
@@ -813,7 +813,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        Topic:                request.mq.Topic,
                        QueueId:              int32(request.mq.QueueId),
                        QueueOffset:          request.nextOffset,
-                       MaxMsgNums:           pc.option.PullBatchSize,
+                       MaxMsgNums:           pc.option.PullBatchSize.Load(),
                        SysFlag:              sysFlag,
                        CommitOffset:         commitOffsetValue,
                        SubExpression:        subExpression,

Reply via email to