This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 3493a47 feat: support consumer consume tps option (#1101) 3493a47 is described below commit 3493a4783ab39d810cffc37757b5b005cfa57bd9 Author: WeizhongTu <tuweizh...@163.com> AuthorDate: Mon Oct 16 14:36:08 2023 +0800 feat: support consumer consume tps option (#1101) * feat: support consumer consume tps option * feat: support consumer consume tps option --- consumer/consumer.go | 8 ++++++++ consumer/option.go | 28 ++++++++++++++++++++-------- consumer/pull_consumer.go | 4 ++-- consumer/push_consumer.go | 12 ++++++------ 4 files changed, 36 insertions(+), 16 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index acce804..4ed4940 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -414,6 +414,14 @@ func (dc *defaultConsumer) doBalance() { return (mqAll[i].QueueId - mqAll[j].QueueId) < 0 }) allocateResult := dc.allocate(dc.consumerGroup, dc.client.ClientID(), mqAll, cidAll) + + // Principle of flow control: pull TPS = 1000ms/PullInterval * BatchSize * len(allocateResult) + if consumeTPS := dc.option.ConsumeTPS.Load(); consumeTPS > 0 && len(allocateResult) > 0 { + pullBatchSize := dc.option.PullBatchSize.Load() + pullTimesPerSecond := float64(consumeTPS) / float64(pullBatchSize*int32(len(allocateResult))) + dc.option.PullInterval.Store(time.Duration(float64(time.Second) / pullTimesPerSecond)) + } + changed := dc.updateProcessQueueTable(topic, allocateResult) if changed { dc.mqChanged(topic, mqAll, allocateResult) diff --git a/consumer/option.go b/consumer/option.go index 3f7e6ca..ac7dd93 100644 --- a/consumer/option.go +++ b/consumer/option.go @@ -21,6 +21,8 @@ import ( "strings" "time" + "go.uber.org/atomic" + "github.com/apache/rocketmq-client-go/v2/hooks" "github.com/apache/rocketmq-client-go/v2/internal" "github.com/apache/rocketmq-client-go/v2/primitive" @@ -55,8 +57,8 @@ type consumerOptions struct { // Flow control threshold on topic level, default value is -1(Unlimited) // - // The value of {@code pullThresholdForQueue} will be overwrote and calculated based on - // {@code pullThresholdForTopic} if it is't unlimited + // The value of {@code pullThresholdForQueue} will be overwritten and calculated based on + // {@code pullThresholdForTopic} if it isn't unlimited // // For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer, // then pullThresholdForQueue will be set to 100 @@ -64,21 +66,24 @@ type consumerOptions struct { // Limit the cached message size on topic level, default value is -1 MiB(Unlimited) // - // The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on - // {@code pullThresholdSizeForTopic} if it is't unlimited + // The value of {@code pullThresholdSizeForQueue} will be overwritten and calculated based on + // {@code pullThresholdSizeForTopic} if it isn't unlimited // // For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are // assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB PullThresholdSizeForTopic int // Message pull Interval - PullInterval time.Duration + PullInterval atomic.Duration + + // Message consumer tps + ConsumeTPS atomic.Int32 // Batch consumption size ConsumeMessageBatchMaxSize int // Batch pull size - PullBatchSize int32 + PullBatchSize atomic.Int32 // Whether update subscription relationship when every pull PostSubscriptionWhenPull bool @@ -283,7 +288,7 @@ func WithStrategy(strategy AllocateStrategy) Option { func WithPullBatchSize(batchSize int32) Option { return func(options *consumerOptions) { - options.PullBatchSize = batchSize + options.PullBatchSize.Store(batchSize) } } @@ -307,7 +312,14 @@ func WithSuspendCurrentQueueTimeMillis(suspendT time.Duration) Option { func WithPullInterval(interval time.Duration) Option { return func(options *consumerOptions) { - options.PullInterval = interval + options.PullInterval.Store(interval) + } +} + +// WithConsumeTPS set single-machine consumption tps +func WithConsumeTPS(tps int32) Option { + return func(options *consumerOptions) { + options.ConsumeTPS.Store(tps) } } diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index 8af88f1..bd05cbf 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -700,7 +700,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) { time.Sleep(sleepTime) } // reset time - sleepTime = pc.option.PullInterval + sleepTime = pc.option.PullInterval.Load() pq.lastPullTime.Store(time.Now()) err := pc.makeSureStateOK() if err != nil { @@ -736,7 +736,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) { Topic: request.mq.Topic, QueueId: int32(request.mq.QueueId), QueueOffset: request.nextOffset, - MaxMsgNums: pc.option.PullBatchSize, + MaxMsgNums: pc.option.PullBatchSize.Load(), SysFlag: sysFlag, CommitOffset: 0, SubExpression: sd.SubString, diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 2799841..ab105b3 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -596,7 +596,7 @@ func (pc *pushConsumer) validate() error { } } - if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535*time.Millisecond { + if interval := pc.option.PullInterval.Load(); interval < 0 || interval > 65535*time.Millisecond { return errors.New("option.PullInterval out of range [0, 65535]") } @@ -608,9 +608,9 @@ func (pc *pushConsumer) validate() error { } } - if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 { - if pc.option.PullBatchSize == 0 { - pc.option.PullBatchSize = 32 + if pullBatchSize := pc.option.PullBatchSize.Load(); pullBatchSize < 1 || pullBatchSize > 1024 { + if pullBatchSize == 0 { + pc.option.PullBatchSize.Store(32) } else { return errors.New("option.PullBatchSize out of range [1, 1024]") } @@ -674,7 +674,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { time.Sleep(sleepTime) } // reset time - sleepTime = pc.option.PullInterval + sleepTime = pc.option.PullInterval.Load() pq.lastPullTime.Store(time.Now()) err := pc.makeSureStateOK() if err != nil { @@ -813,7 +813,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { Topic: request.mq.Topic, QueueId: int32(request.mq.QueueId), QueueOffset: request.nextOffset, - MaxMsgNums: pc.option.PullBatchSize, + MaxMsgNums: pc.option.PullBatchSize.Load(), SysFlag: sysFlag, CommitOffset: commitOffsetValue, SubExpression: subExpression,