This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8afd69f  [ISSUE #927] fix processQueue remove offset (#928)
8afd69f is described below

commit 8afd69f8b0f0fa0cdad877536a6c216ee722eec7
Author: 0daypwn <[email protected]>
AuthorDate: Tue Nov 1 16:55:29 2022 +0800

    [ISSUE #927] fix processQueue remove offset (#928)
    
    Co-authored-by: wuxb02 <[email protected]>
---
 consumer/process_queue.go | 17 +++++++----------
 1 file changed, 7 insertions(+), 10 deletions(-)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 7bcfa2f..49dae7f 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -99,16 +99,7 @@ func (pq *processQueue) putMessage(messages 
...*primitive.MessageExt) {
        if pq.IsDroppd() {
                return
        }
-       if !pq.order {
-               select {
-               case <-pq.closeChan:
-                       return
-               case pq.msgCh <- messages:
-               }
-       }
-
        pq.mutex.Lock()
-
        validMessageCount := 0
        for idx := range messages {
                msg := messages[idx]
@@ -126,9 +117,15 @@ func (pq *processQueue) putMessage(messages 
...*primitive.MessageExt) {
 
                pq.cachedMsgSize.Add(int64(len(msg.Body)))
        }
-
        pq.cachedMsgCount.Add(int64(validMessageCount))
        pq.mutex.Unlock()
+       if !pq.order {
+               select {
+               case <-pq.closeChan:
+                       return
+               case pq.msgCh <- messages:
+               }
+       }
 
        if pq.cachedMsgCount.Load() > 0 && !pq.consuming {
                pq.consuming = true

Reply via email to