This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 96f00c4  [ISSUE #953] fix limiter with goroutine cover (#952)
96f00c4 is described below

commit 96f00c44c834011a5e439457873e7544e585ea05
Author: maqingxiang <[email protected]>
AuthorDate: Wed Nov 2 10:08:35 2022 +0800

    [ISSUE #953] fix limiter with goroutine cover (#952)
    
    * fix limiter with goroutine cover
    
    * fix limiter with goroutine cover
    
    Co-authored-by: 鲁扬 <[email protected]>
---
 consumer/push_consumer.go | 13 ++++---------
 1 file changed, 4 insertions(+), 9 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 8f9c69c..da7ba92 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1046,10 +1046,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq 
*processQueue, mq *primiti
 
        limiter := pc.option.Limiter
        limiterOn := limiter != nil
-       if !limiterOn {
-               if _, ok := pc.crCh[mq.Topic]; !ok {
-                       pc.crCh[mq.Topic] = make(chan struct{}, 
pc.defaultConsumer.option.ConsumeGoroutineNums)
-               }
+       if _, ok := pc.crCh[mq.Topic]; !ok {
+               pc.crCh[mq.Topic] = make(chan struct{}, 
pc.defaultConsumer.option.ConsumeGoroutineNums)
        }
 
        for count := 0; count < len(msgs); count++ {
@@ -1065,9 +1063,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq 
*processQueue, mq *primiti
 
                if limiterOn {
                        limiter(utils.WithoutNamespace(mq.Topic))
-               } else {
-                       pc.crCh[mq.Topic] <- struct{}{}
                }
+               pc.crCh[mq.Topic] <- struct{}{}
 
                go primitive.WithRecover(func() {
                        defer func() {
@@ -1077,9 +1074,7 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq 
*processQueue, mq *primiti
                                                rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
                                        })
                                }
-                               if !limiterOn {
-                                       <-pc.crCh[mq.Topic]
-                               }
+                               <-pc.crCh[mq.Topic]
                        }()
                RETRY:
                        if pq.IsDroppd() {

Reply via email to