This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 0e19ee6 Send retry as a normal message when sendMessageBack fails (#1107) 0e19ee6 is described below commit 0e19ee654819bda396a08d950c883f9008b8222b Author: Liu Shengzhong <szliu0...@gmail.com> AuthorDate: Thu Apr 11 23:59:05 2024 +0800 Send retry as a normal message when sendMessageBack fails (#1107) --- consumer/consumer.go | 96 +++++++++++++++++++++++++++++++++++++++++++++++ consumer/pull_consumer.go | 8 +++- consumer/push_consumer.go | 7 ++-- 3 files changed, 106 insertions(+), 5 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 4ed4940..bbb5415 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -20,6 +20,7 @@ package consumer import ( "context" "fmt" + "math/rand" "sort" "strconv" "strings" @@ -57,6 +58,9 @@ const ( // Offset persistent interval for consumer _PersistConsumerOffsetInterval = 5 * time.Second + + // Timeout for sending message to retry topic + _SendMessageBackAsNormalTimeout = 3 * time.Second ) type ConsumeType string @@ -66,6 +70,8 @@ const ( _PushConsume = ConsumeType("CONSUME_PASSIVELY") _SubAll = "*" + + _ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER" ) // Message model defines the way how messages are delivered to each consumer clients. @@ -1037,6 +1043,96 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, t return strconv.ParseInt(response.ExtFields["offset"], 10, 64) } +func (dc *defaultConsumer) sendMessageBackAsNormal(msg *primitive.MessageExt, maxReconsumeTimes int32) bool { + retryTopic := internal.GetRetryTopic(dc.consumerGroup) + normalMsg := &primitive.Message{ + Topic: retryTopic, + Body: msg.Body, + Flag: msg.Flag, + } + normalMsg.WithProperties(msg.GetProperties()) + originMsgId := msg.GetProperty(primitive.PropertyOriginMessageId) + if len(originMsgId) == 0 { + originMsgId = msg.MsgId + } + normalMsg.WithProperty(primitive.PropertyOriginMessageId, originMsgId) + normalMsg.WithProperty(primitive.PropertyRetryTopic, msg.Topic) + normalMsg.RemoveProperty(primitive.PropertyTransactionPrepared) + normalMsg.WithDelayTimeLevel(int(3 + msg.ReconsumeTimes)) + + mq, err := dc.findPublishMessageQueue(retryTopic) + if err != nil { + rlog.Warning("sendMessageBackAsNormal find publish message queue error", map[string]interface{}{ + rlog.LogKeyTopic: retryTopic, + rlog.LogKeyMessageId: msg.MsgId, + rlog.LogKeyUnderlayError: err.Error(), + }) + return false + } + + brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName) + if len(brokerAddr) == 0 { + rlog.Warning("sendMessageBackAsNormal cannot find broker address", map[string]interface{}{ + rlog.LogKeyMessageId: msg.MsgId, + rlog.LogKeyBroker: mq.BrokerName, + rlog.LogKeyUnderlayError: err.Error(), + }) + return false + } + + request := buildSendToRetryRequest(mq, normalMsg, msg.ReconsumeTimes+1, maxReconsumeTimes) + resp, err := dc.client.InvokeSync(context.Background(), brokerAddr, request, _SendMessageBackAsNormalTimeout) + if err != nil { + rlog.Warning("sendMessageBackAsNormal failed to invoke", map[string]interface{}{ + rlog.LogKeyTopic: retryTopic, + rlog.LogKeyMessageId: msg.MsgId, + rlog.LogKeyBroker: brokerAddr, + rlog.LogKeyUnderlayError: err.Error(), + }) + return false + } + if resp.Code != internal.ResSuccess { + rlog.Warning("sendMessageBackAsNormal failed to send", map[string]interface{}{ + rlog.LogKeyTopic: retryTopic, + rlog.LogKeyMessageId: msg.MsgId, + rlog.LogKeyBroker: brokerAddr, + rlog.LogKeyUnderlayError: fmt.Errorf("CODE: %d, DESC: %s", resp.Code, resp.Remark), + }) + return false + } + + return true +} + +func (dc *defaultConsumer) findPublishMessageQueue(topic string) (*primitive.MessageQueue, error) { + mqs, err := dc.client.GetNameSrv().FetchPublishMessageQueues(topic) + if err != nil { + return nil, err + } + + if len(mqs) <= 0 { + return nil, fmt.Errorf("no writable queues") + } + + return mqs[rand.Intn(len(mqs))], nil +} + +func buildSendToRetryRequest(mq *primitive.MessageQueue, msg *primitive.Message, reconsumeTimes, + maxReconsumeTimes int32) *remote.RemotingCommand { + req := &internal.SendMessageRequestHeader{ + ProducerGroup: _ClientInnerProducerGroup, + Topic: mq.Topic, + QueueId: mq.QueueId, + BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond), + Flag: msg.Flag, + Properties: msg.MarshallProperties(), + ReconsumeTimes: int(reconsumeTimes), + MaxReconsumeTimes: int(maxReconsumeTimes), + } + + return remote.NewRemotingCommand(internal.ReqSendMessage, req, msg.Body) +} + func buildSubscriptionData(topic string, selector MessageSelector) *internal.SubscriptionData { subData := &internal.SubscriptionData{ Topic: topic, diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index bd05cbf..b26c829 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -628,8 +628,12 @@ func (pc *defaultPullConsumer) sendMessageBack(brokerName string, msg *primitive } else { brokerAddr = msg.StoreHost } - _, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second) - return err == nil + resp, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second) + if err != nil || resp.Code != internal.ResSuccess { + // send back as a normal message + return pc.defaultConsumer.sendMessageBackAsNormal(msg, pc.getMaxReconsumeTimes()) + } + return true } func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLevel int) *remote.RemotingCommand { diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index ab105b3..c600f13 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -921,9 +921,10 @@ func (pc *pushConsumer) sendMessageBack(brokerName string, msg *primitive.Messag } else { brokerAddr = msg.StoreHost } - _, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second) - if err != nil { - return false + resp, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second) + if err != nil || resp.Code != internal.ResSuccess { + // send back as a normal message + return pc.defaultConsumer.sendMessageBackAsNormal(msg, pc.getMaxReconsumeTimes()) } return true }