This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b4b4df3  feat: fix push consumer pause data race (#1217)
b4b4df3 is described below

commit b4b4df33301eb91a105ec57b2ef1d2ee352df895
Author: WeizhongTu <[email protected]>
AuthorDate: Wed Sep 3 17:01:37 2025 +0800

    feat: fix push consumer pause data race (#1217)
---
 consumer/consumer.go      | 20 ++++++++++----------
 consumer/pull_consumer.go | 11 ++++++-----
 consumer/push_consumer.go | 16 ++++++++--------
 3 files changed, 24 insertions(+), 23 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 0edeef2..98b4447 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -25,11 +25,11 @@ import (
        "strconv"
        "strings"
        "sync"
-       "sync/atomic"
        "time"
 
        jsoniter "github.com/json-iterator/go"
        "github.com/tidwall/gjson"
+       "go.uber.org/atomic"
 
        "github.com/apache/rocketmq-client-go/v2/errors"
        "github.com/apache/rocketmq-client-go/v2/hooks"
@@ -250,8 +250,8 @@ type defaultConsumer struct {
        cType     ConsumeType
        client    internal.RMQClient
        mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
-       state     int32
-       pause     bool
+       state     *atomic.Int32
+       pause     *atomic.Bool
        once      sync.Once
        option    consumerOptions
        // key: primitive.MessageQueue
@@ -289,14 +289,14 @@ func (dc *defaultConsumer) start() error {
        }
 
        dc.client.Start()
-       atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
+       dc.state.Store(int32(internal.StateRunning))
        dc.consumerStartTimestamp = time.Now().UnixNano() / 
int64(time.Millisecond)
        dc.stat = NewStatsManager()
        return nil
 }
 
 func (dc *defaultConsumer) shutdown() error {
-       atomic.StoreInt32(&dc.state, int32(internal.StateShutdown))
+       dc.state.Store(int32(internal.StateShutdown))
 
        mqs := make([]*primitive.MessageQueue, 0)
        dc.processQueueTable.Range(func(key, value interface{}) bool {
@@ -317,11 +317,11 @@ func (dc *defaultConsumer) shutdown() error {
 }
 
 func (dc *defaultConsumer) isRunning() bool {
-       return atomic.LoadInt32(&dc.state) == int32(internal.StateRunning)
+       return dc.state.Load() == int32(internal.StateRunning)
 }
 
 func (dc *defaultConsumer) isStopped() bool {
-       return atomic.LoadInt32(&dc.state) == int32(internal.StateShutdown)
+       return dc.state.Load() == int32(internal.StateShutdown)
 }
 
 func (dc *defaultConsumer) persistConsumerOffset() error {
@@ -371,7 +371,7 @@ func (dc *defaultConsumer) isSubscribeTopicNeedUpdate(topic 
string) bool {
 }
 
 func (dc *defaultConsumer) doBalanceIfNotPaused() {
-       if dc.pause {
+       if dc.pause.Load() {
                rlog.Info("[BALANCE-SKIP] since consumer paused", 
map[string]interface{}{
                        rlog.LogKeyConsumerGroup: dc.consumerGroup,
                })
@@ -483,8 +483,8 @@ func (dc *defaultConsumer) SubscriptionDataList() 
[]*internal.SubscriptionData {
 }
 
 func (dc *defaultConsumer) makeSureStateOK() error {
-       if atomic.LoadInt32(&dc.state) != int32(internal.StateRunning) {
-               return fmt.Errorf("state not running, actually: %v", dc.state)
+       if dc.state.Load() != int32(internal.StateRunning) {
+               return fmt.Errorf("state not running, actually: %v", 
dc.state.Load())
        }
        return nil
 }
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 9592061..c6af4f7 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -31,6 +31,7 @@ import (
        errors2 "github.com/apache/rocketmq-client-go/v2/errors"
        "github.com/apache/rocketmq-client-go/v2/internal/remote"
        "github.com/apache/rocketmq-client-go/v2/internal/utils"
+       atomic2 "go.uber.org/atomic"
 
        "github.com/pkg/errors"
 
@@ -111,7 +112,7 @@ func NewPullConsumer(options ...Option) 
(*defaultPullConsumer, error) {
                client:        
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
                consumerGroup: utils.WrapNamespace(defaultOpts.Namespace, 
defaultOpts.GroupName),
                cType:         _PullConsume,
-               state:         int32(internal.StateCreateJust),
+               state:         
atomic2.NewInt32(int32(internal.StateCreateJust)),
                prCh:          make(chan PullRequest, 4),
                model:         defaultOpts.ConsumerModel,
                option:        defaultOpts,
@@ -149,8 +150,8 @@ func (pc *defaultPullConsumer) GetTopicRouteInfo(topic 
string) ([]*primitive.Mes
 }
 
 func (pc *defaultPullConsumer) Subscribe(topic string, selector 
MessageSelector) error {
-       if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
-               atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
+       if pc.state.Load() == int32(internal.StateStartFailed) ||
+               pc.state.Load() == int32(internal.StateShutdown) {
                return errors2.ErrStartTopic
        }
        if pc.SubType == Assign {
@@ -247,7 +248,7 @@ func (pc *defaultPullConsumer) Start() error {
                if err != nil {
                        return
                }
-               atomic.StoreInt32(&pc.state, int32(internal.StateRunning))
+               pc.state.Store(int32(internal.StateRunning))
                go func() {
                        for {
                                select {
@@ -837,7 +838,7 @@ func (pc *defaultPullConsumer) pullMessage(request 
*PullRequest) {
                        goto NEXT
                }
 
-               if pc.pause {
+               if pc.pause.Load() {
                        rlog.Debug(fmt.Sprintf("defaultPullConsumer [%s] of 
[%s] was paused, execute pull request [%s] later",
                                pc.option.InstanceName, pc.consumerGroup, 
request.String()), nil)
                        sleepTime = _PullDelayTimeWhenSuspend
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index e6ec3e5..0c2a630 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -25,10 +25,10 @@ import (
        "strconv"
        "strings"
        "sync"
-       "sync/atomic"
        "time"
 
        errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+       "go.uber.org/atomic"
 
        "github.com/pkg/errors"
 
@@ -97,7 +97,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
                client:         
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
                consumerGroup:  defaultOpts.GroupName,
                cType:          _PushConsume,
-               state:          int32(internal.StateCreateJust),
+               state:          
atomic.NewInt32(int32(internal.StateCreateJust)),
                prCh:           make(chan PullRequest, 4),
                model:          defaultOpts.ConsumerModel,
                consumeOrderly: defaultOpts.ConsumeOrderly,
@@ -138,7 +138,7 @@ func (pc *pushConsumer) Start() error {
                        "messageModel":           pc.model,
                        "unitMode":               pc.unitMode,
                })
-               atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
+               pc.state.Store(int32(internal.StateStartFailed))
                err = pc.validate()
                if err != nil {
                        rlog.Error("the consumer group option validate fail", 
map[string]interface{}{
@@ -289,8 +289,8 @@ func (pc *pushConsumer) Shutdown() error {
 
 func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
        f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, 
error)) error {
-       if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
-               atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
+       if pc.state.Load() == int32(internal.StateStartFailed) ||
+               pc.state.Load() == int32(internal.StateShutdown) {
                return errors2.ErrStartTopic
        }
 
@@ -685,7 +685,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                        goto NEXT
                }
 
-               if pc.pause {
+               if pc.pause.Load() {
                        rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was 
paused, execute pull request [%s] later",
                                pc.option.InstanceName, pc.consumerGroup, 
request.String()), nil)
                        sleepTime = _PullDelayTimeWhenSuspend
@@ -945,12 +945,12 @@ func (pc *pushConsumer) buildSendBackRequest(msg 
*primitive.MessageExt, delayLev
 }
 
 func (pc *pushConsumer) suspend() {
-       pc.pause = true
+       pc.pause.Store(true)
        rlog.Info(fmt.Sprintf("suspend consumer: %s", pc.consumerGroup), nil)
 }
 
 func (pc *pushConsumer) resume() {
-       pc.pause = false
+       pc.pause.Store(false)
        pc.doBalance()
        rlog.Info(fmt.Sprintf("resume consumer: %s", pc.consumerGroup), nil)
 }

Reply via email to