This is an automated email from the ASF dual-hosted git repository.

cserwen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 63a5a1f  [ISSUE #1034] fix: the message may be cleaned when the 
message has not been consumed
63a5a1f is described below

commit 63a5a1f7b62395b570e8a736cd67bfa12f2f37d5
Author: Humkum <[email protected]>
AuthorDate: Thu Apr 27 17:30:38 2023 +0800

    [ISSUE #1034] fix: the message may be cleaned when the message has not been 
consumed
---
 consumer/process_queue.go | 26 ++++++++++++++++++--------
 consumer/push_consumer.go | 21 ++++++++++++++++++++-
 rlog/log.go               |  5 +++++
 3 files changed, 43 insertions(+), 9 deletions(-)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index aeb66d8..120595f 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -259,22 +259,32 @@ func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) 
{
                                        "time":                   startTime,
                                        rlog.LogKeyUnderlayError: err,
                                })
+                               pq.mutex.RUnlock()
                                continue
                        }
                        if time.Now().UnixNano()/1e6-st <= 
int64(pc.option.ConsumeTimeout/time.Millisecond) {
                                pq.mutex.RUnlock()
                                return
                        }
-               }
-               pq.mutex.RUnlock()
-
-               if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
-                       rlog.Error("send message back to broker error when 
clean expired messages", map[string]interface{}{
-                               rlog.LogKeyConsumerGroup: pc.consumerGroup,
+                       rlog.Info("send expire msg back. ", 
map[string]interface{}{
+                               rlog.LogKeyTopic:       msg.Topic,
+                               rlog.LogKeyMessageId:   msg.MsgId,
+                               "startTime":            startTime,
+                               rlog.LogKeyStoreHost:   msg.StoreHost,
+                               rlog.LogKeyQueueId:     msg.Queue.QueueId,
+                               rlog.LogKeyQueueOffset: msg.QueueOffset,
                        })
-                       continue
+                       pq.mutex.RUnlock()
+                       if !pc.sendMessageBack("", msg, 
int(3+msg.ReconsumeTimes)) {
+                               rlog.Error("send message back to broker error 
when clean expired messages", map[string]interface{}{
+                                       rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
+                               })
+                               continue
+                       }
+                       pq.removeMessage(msg)
+               } else {
+                       pq.mutex.RUnlock()
                }
-               pq.removeMessage(msg)
        }
 }
 
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 85f9725..531a5b6 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1133,8 +1133,19 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq 
*processQueue, mq *primiti
 
                        consumeRT := time.Now().Sub(beginTime)
                        if err != nil {
+                               rlog.Warning("consumeMessageCurrently error", 
map[string]interface{}{
+                                       rlog.LogKeyUnderlayError: err,
+                                       rlog.LogKeyMessages:      msgs,
+                                       rlog.LogKeyMessageQueue:  mq,
+                                       rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
+                               })
                                msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.ExceptionReturn)
                        } else if consumeRT >= pc.option.ConsumeTimeout {
+                               rlog.Warning("consumeMessageCurrently time 
out", map[string]interface{}{
+                                       rlog.LogKeyMessages:      msgs,
+                                       rlog.LogKeyMessageQueue:  mq,
+                                       rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
+                               })
                                msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.TimeoutReturn)
                        } else if result == ConsumeSuccess {
                                msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.SuccessReturn)
@@ -1262,7 +1273,15 @@ func (pc *pushConsumer) consumeMessageOrderly(pq 
*processQueue, mq *primitive.Me
                        ctx = primitive.WithOrderlyCtx(ctx, orderlyCtx)
 
                        pq.lockConsume.Lock()
-                       result, _ := pc.consumeInner(ctx, msgs)
+                       result, err := pc.consumeInner(ctx, msgs)
+                       if err != nil {
+                               rlog.Warning("consumeMessage orderly error", 
map[string]interface{}{
+                                       rlog.LogKeyUnderlayError: err,
+                                       rlog.LogKeyMessages:      msgs,
+                                       rlog.LogKeyMessageQueue:  mq.String(),
+                                       rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
+                               })
+                       }
                        pq.lockConsume.Unlock()
 
                        if result == Rollback || result == 
SuspendCurrentQueueAMoment {
diff --git a/rlog/log.go b/rlog/log.go
index e4070b7..5c99e2d 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -39,6 +39,11 @@ const (
        LogKeyValueChangedTo       = "changeTo"
        LogKeyPullRequest          = "PullRequest"
        LogKeyTimeStamp            = "timestamp"
+       LogKeyMessageId            = "msgId"
+       LogKeyStoreHost            = "storeHost"
+       LogKeyQueueId              = "queueId"
+       LogKeyQueueOffset          = "queueOffset"
+       LogKeyMessages             = "messages"
 )
 
 type Logger interface {

Reply via email to