This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 96f00c4 [ISSUE #953] fix limiter with goroutine cover (#952)
96f00c4 is described below
commit 96f00c44c834011a5e439457873e7544e585ea05
Author: maqingxiang <[email protected]>
AuthorDate: Wed Nov 2 10:08:35 2022 +0800
[ISSUE #953] fix limiter with goroutine cover (#952)
* fix limiter with goroutine cover
* fix limiter with goroutine cover
Co-authored-by: 鲁扬 <[email protected]>
---
consumer/push_consumer.go | 13 ++++---------
1 file changed, 4 insertions(+), 9 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 8f9c69c..da7ba92 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1046,10 +1046,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq
*processQueue, mq *primiti
limiter := pc.option.Limiter
limiterOn := limiter != nil
- if !limiterOn {
- if _, ok := pc.crCh[mq.Topic]; !ok {
- pc.crCh[mq.Topic] = make(chan struct{},
pc.defaultConsumer.option.ConsumeGoroutineNums)
- }
+ if _, ok := pc.crCh[mq.Topic]; !ok {
+ pc.crCh[mq.Topic] = make(chan struct{},
pc.defaultConsumer.option.ConsumeGoroutineNums)
}
for count := 0; count < len(msgs); count++ {
@@ -1065,9 +1063,8 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq
*processQueue, mq *primiti
if limiterOn {
limiter(utils.WithoutNamespace(mq.Topic))
- } else {
- pc.crCh[mq.Topic] <- struct{}{}
}
+ pc.crCh[mq.Topic] <- struct{}{}
go primitive.WithRecover(func() {
defer func() {
@@ -1077,9 +1074,7 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq
*processQueue, mq *primiti
rlog.LogKeyConsumerGroup:
pc.consumerGroup,
})
}
- if !limiterOn {
- <-pc.crCh[mq.Topic]
- }
+ <-pc.crCh[mq.Topic]
}()
RETRY:
if pq.IsDroppd() {