This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e19ee6  Send retry as a normal message when sendMessageBack fails 
(#1107)
0e19ee6 is described below

commit 0e19ee654819bda396a08d950c883f9008b8222b
Author: Liu Shengzhong <szliu0...@gmail.com>
AuthorDate: Thu Apr 11 23:59:05 2024 +0800

    Send retry as a normal message when sendMessageBack fails (#1107)
---
 consumer/consumer.go      | 96 +++++++++++++++++++++++++++++++++++++++++++++++
 consumer/pull_consumer.go |  8 +++-
 consumer/push_consumer.go |  7 ++--
 3 files changed, 106 insertions(+), 5 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 4ed4940..bbb5415 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -20,6 +20,7 @@ package consumer
 import (
        "context"
        "fmt"
+       "math/rand"
        "sort"
        "strconv"
        "strings"
@@ -57,6 +58,9 @@ const (
 
        // Offset persistent interval for consumer
        _PersistConsumerOffsetInterval = 5 * time.Second
+
+       // Timeout for sending message to retry topic
+       _SendMessageBackAsNormalTimeout = 3 * time.Second
 )
 
 type ConsumeType string
@@ -66,6 +70,8 @@ const (
        _PushConsume = ConsumeType("CONSUME_PASSIVELY")
 
        _SubAll = "*"
+
+       _ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER"
 )
 
 // Message model defines the way how messages are delivered to each consumer 
clients.
@@ -1037,6 +1043,96 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq 
*primitive.MessageQueue, t
        return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
 }
 
+func (dc *defaultConsumer) sendMessageBackAsNormal(msg *primitive.MessageExt, 
maxReconsumeTimes int32) bool {
+       retryTopic := internal.GetRetryTopic(dc.consumerGroup)
+       normalMsg := &primitive.Message{
+               Topic: retryTopic,
+               Body:  msg.Body,
+               Flag:  msg.Flag,
+       }
+       normalMsg.WithProperties(msg.GetProperties())
+       originMsgId := msg.GetProperty(primitive.PropertyOriginMessageId)
+       if len(originMsgId) == 0 {
+               originMsgId = msg.MsgId
+       }
+       normalMsg.WithProperty(primitive.PropertyOriginMessageId, originMsgId)
+       normalMsg.WithProperty(primitive.PropertyRetryTopic, msg.Topic)
+       normalMsg.RemoveProperty(primitive.PropertyTransactionPrepared)
+       normalMsg.WithDelayTimeLevel(int(3 + msg.ReconsumeTimes))
+
+       mq, err := dc.findPublishMessageQueue(retryTopic)
+       if err != nil {
+               rlog.Warning("sendMessageBackAsNormal find publish message 
queue error", map[string]interface{}{
+                       rlog.LogKeyTopic:         retryTopic,
+                       rlog.LogKeyMessageId:     msg.MsgId,
+                       rlog.LogKeyUnderlayError: err.Error(),
+               })
+               return false
+       }
+
+       brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
+       if len(brokerAddr) == 0 {
+               rlog.Warning("sendMessageBackAsNormal cannot find broker 
address", map[string]interface{}{
+                       rlog.LogKeyMessageId:     msg.MsgId,
+                       rlog.LogKeyBroker:        mq.BrokerName,
+                       rlog.LogKeyUnderlayError: err.Error(),
+               })
+               return false
+       }
+
+       request := buildSendToRetryRequest(mq, normalMsg, msg.ReconsumeTimes+1, 
maxReconsumeTimes)
+       resp, err := dc.client.InvokeSync(context.Background(), brokerAddr, 
request, _SendMessageBackAsNormalTimeout)
+       if err != nil {
+               rlog.Warning("sendMessageBackAsNormal failed to invoke", 
map[string]interface{}{
+                       rlog.LogKeyTopic:         retryTopic,
+                       rlog.LogKeyMessageId:     msg.MsgId,
+                       rlog.LogKeyBroker:        brokerAddr,
+                       rlog.LogKeyUnderlayError: err.Error(),
+               })
+               return false
+       }
+       if resp.Code != internal.ResSuccess {
+               rlog.Warning("sendMessageBackAsNormal failed to send", 
map[string]interface{}{
+                       rlog.LogKeyTopic:         retryTopic,
+                       rlog.LogKeyMessageId:     msg.MsgId,
+                       rlog.LogKeyBroker:        brokerAddr,
+                       rlog.LogKeyUnderlayError: fmt.Errorf("CODE: %d, DESC: 
%s", resp.Code, resp.Remark),
+               })
+               return false
+       }
+
+       return true
+}
+
+func (dc *defaultConsumer) findPublishMessageQueue(topic string) 
(*primitive.MessageQueue, error) {
+       mqs, err := dc.client.GetNameSrv().FetchPublishMessageQueues(topic)
+       if err != nil {
+               return nil, err
+       }
+
+       if len(mqs) <= 0 {
+               return nil, fmt.Errorf("no writable queues")
+       }
+
+       return mqs[rand.Intn(len(mqs))], nil
+}
+
+func buildSendToRetryRequest(mq *primitive.MessageQueue, msg 
*primitive.Message, reconsumeTimes,
+       maxReconsumeTimes int32) *remote.RemotingCommand {
+       req := &internal.SendMessageRequestHeader{
+               ProducerGroup:     _ClientInnerProducerGroup,
+               Topic:             mq.Topic,
+               QueueId:           mq.QueueId,
+               BornTimestamp:     time.Now().UnixNano() / 
int64(time.Millisecond),
+               Flag:              msg.Flag,
+               Properties:        msg.MarshallProperties(),
+               ReconsumeTimes:    int(reconsumeTimes),
+               MaxReconsumeTimes: int(maxReconsumeTimes),
+       }
+
+       return remote.NewRemotingCommand(internal.ReqSendMessage, req, msg.Body)
+}
+
 func buildSubscriptionData(topic string, selector MessageSelector) 
*internal.SubscriptionData {
        subData := &internal.SubscriptionData{
                Topic:     topic,
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index bd05cbf..b26c829 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -628,8 +628,12 @@ func (pc *defaultPullConsumer) sendMessageBack(brokerName 
string, msg *primitive
        } else {
                brokerAddr = msg.StoreHost
        }
-       _, err := pc.client.InvokeSync(context.Background(), brokerAddr, 
pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
-       return err == nil
+       resp, err := pc.client.InvokeSync(context.Background(), brokerAddr, 
pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
+       if err != nil || resp.Code != internal.ResSuccess {
+               // send back as a normal message
+               return pc.defaultConsumer.sendMessageBackAsNormal(msg, 
pc.getMaxReconsumeTimes())
+       }
+       return true
 }
 
 func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt, 
delayLevel int) *remote.RemotingCommand {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index ab105b3..c600f13 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -921,9 +921,10 @@ func (pc *pushConsumer) sendMessageBack(brokerName string, 
msg *primitive.Messag
        } else {
                brokerAddr = msg.StoreHost
        }
-       _, err := pc.client.InvokeSync(context.Background(), brokerAddr, 
pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
-       if err != nil {
-               return false
+       resp, err := pc.client.InvokeSync(context.Background(), brokerAddr, 
pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
+       if err != nil || resp.Code != internal.ResSuccess {
+               // send back as a normal message
+               return pc.defaultConsumer.sendMessageBackAsNormal(msg, 
pc.getMaxReconsumeTimes())
        }
        return true
 }

Reply via email to