This is an automated email from the ASF dual-hosted git repository.
cserwen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 63a5a1f [ISSUE #1034] fix: the message may be cleaned when the
message has not been consumed
63a5a1f is described below
commit 63a5a1f7b62395b570e8a736cd67bfa12f2f37d5
Author: Humkum <[email protected]>
AuthorDate: Thu Apr 27 17:30:38 2023 +0800
[ISSUE #1034] fix: the message may be cleaned when the message has not been
consumed
---
consumer/process_queue.go | 26 ++++++++++++++++++--------
consumer/push_consumer.go | 21 ++++++++++++++++++++-
rlog/log.go | 5 +++++
3 files changed, 43 insertions(+), 9 deletions(-)
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index aeb66d8..120595f 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -259,22 +259,32 @@ func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer)
{
"time": startTime,
rlog.LogKeyUnderlayError: err,
})
+ pq.mutex.RUnlock()
continue
}
if time.Now().UnixNano()/1e6-st <=
int64(pc.option.ConsumeTimeout/time.Millisecond) {
pq.mutex.RUnlock()
return
}
- }
- pq.mutex.RUnlock()
-
- if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
- rlog.Error("send message back to broker error when
clean expired messages", map[string]interface{}{
- rlog.LogKeyConsumerGroup: pc.consumerGroup,
+ rlog.Info("send expire msg back. ",
map[string]interface{}{
+ rlog.LogKeyTopic: msg.Topic,
+ rlog.LogKeyMessageId: msg.MsgId,
+ "startTime": startTime,
+ rlog.LogKeyStoreHost: msg.StoreHost,
+ rlog.LogKeyQueueId: msg.Queue.QueueId,
+ rlog.LogKeyQueueOffset: msg.QueueOffset,
})
- continue
+ pq.mutex.RUnlock()
+ if !pc.sendMessageBack("", msg,
int(3+msg.ReconsumeTimes)) {
+ rlog.Error("send message back to broker error
when clean expired messages", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
pc.consumerGroup,
+ })
+ continue
+ }
+ pq.removeMessage(msg)
+ } else {
+ pq.mutex.RUnlock()
}
- pq.removeMessage(msg)
}
}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 85f9725..531a5b6 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1133,8 +1133,19 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq
*processQueue, mq *primiti
consumeRT := time.Now().Sub(beginTime)
if err != nil {
+ rlog.Warning("consumeMessageCurrently error",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ rlog.LogKeyMessages: msgs,
+ rlog.LogKeyMessageQueue: mq,
+ rlog.LogKeyConsumerGroup:
pc.consumerGroup,
+ })
msgCtx.Properties[primitive.PropCtxType] =
string(primitive.ExceptionReturn)
} else if consumeRT >= pc.option.ConsumeTimeout {
+ rlog.Warning("consumeMessageCurrently time
out", map[string]interface{}{
+ rlog.LogKeyMessages: msgs,
+ rlog.LogKeyMessageQueue: mq,
+ rlog.LogKeyConsumerGroup:
pc.consumerGroup,
+ })
msgCtx.Properties[primitive.PropCtxType] =
string(primitive.TimeoutReturn)
} else if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] =
string(primitive.SuccessReturn)
@@ -1262,7 +1273,15 @@ func (pc *pushConsumer) consumeMessageOrderly(pq
*processQueue, mq *primitive.Me
ctx = primitive.WithOrderlyCtx(ctx, orderlyCtx)
pq.lockConsume.Lock()
- result, _ := pc.consumeInner(ctx, msgs)
+ result, err := pc.consumeInner(ctx, msgs)
+ if err != nil {
+ rlog.Warning("consumeMessage orderly error",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ rlog.LogKeyMessages: msgs,
+ rlog.LogKeyMessageQueue: mq.String(),
+ rlog.LogKeyConsumerGroup:
pc.consumerGroup,
+ })
+ }
pq.lockConsume.Unlock()
if result == Rollback || result ==
SuspendCurrentQueueAMoment {
diff --git a/rlog/log.go b/rlog/log.go
index e4070b7..5c99e2d 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -39,6 +39,11 @@ const (
LogKeyValueChangedTo = "changeTo"
LogKeyPullRequest = "PullRequest"
LogKeyTimeStamp = "timestamp"
+ LogKeyMessageId = "msgId"
+ LogKeyStoreHost = "storeHost"
+ LogKeyQueueId = "queueId"
+ LogKeyQueueOffset = "queueOffset"
+ LogKeyMessages = "messages"
)
type Logger interface {