This is an automated email from the ASF dual-hosted git repository.
cserwen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 533de03 fix: select one message queue of different broker when retry
to send (#1014)
533de03 is described below
commit 533de03048e19f4f925bd59a3d9cba6c5cba26b6
Author: cserwen <[email protected]>
AuthorDate: Wed Jun 28 15:34:34 2023 +0800
fix: select one message queue of different broker when retry to send (#1014)
Co-authored-by: dengzhiwen1 <[email protected]>
---
internal/response.go | 2 ++
producer/producer.go | 55 ++++++++++++++++++++++++++++++++++++++++-------
producer/producer_test.go | 12 +++++++++--
producer/selector.go | 36 ++++++++++++++++++++++++-------
producer/selector_test.go | 37 +++++++++++++++++++++++++------
5 files changed, 118 insertions(+), 24 deletions(-)
diff --git a/internal/response.go b/internal/response.go
index d1c7e18..8a815ba 100644
--- a/internal/response.go
+++ b/internal/response.go
@@ -23,6 +23,8 @@ const (
ResFlushDiskTimeout = int16(10)
ResSlaveNotAvailable = int16(11)
ResFlushSlaveTimeout = int16(12)
+ ResServiceNotAvailable = int16(14)
+ ResNoPermission = int16(16)
ResTopicNotExist = int16(17)
ResPullNotFound = int16(19)
ResPullRetryImmediately = int16(20)
diff --git a/producer/producer.go b/producer/producer.go
index ebd5e33..d8ca54b 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -155,6 +155,21 @@ func MarshalMessageBatch(msgs ...*primitive.Message)
[]byte {
return buffer.Bytes()
}
+func needRetryCode(code int16) bool {
+ switch code {
+ case internal.ResTopicNotExist:
+ return true
+ case internal.ResServiceNotAvailable:
+ return true
+ case internal.ResError:
+ return true
+ case internal.ResNoPermission:
+ return true
+ default:
+ return false
+ }
+}
+
func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl
time.Duration) (string, error) {
correlationId := uuid.New().String()
requestClientId := p.client.ClientID()
@@ -301,6 +316,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg
*primitive.Message,
var (
err error
+ mq *primitive.MessageQueue
)
var (
@@ -308,12 +324,23 @@ func (p *defaultProducer) sendSync(ctx context.Context,
msg *primitive.Message,
ok bool
)
for retryCount := 0; retryCount < retryTime; retryCount++ {
- mq := p.selectMessageQueue(msg)
+ var lastBrokerName string
+ if mq != nil {
+ lastBrokerName = mq.BrokerName
+ }
+ mq := p.selectMessageQueue(msg, lastBrokerName)
if mq == nil {
err = fmt.Errorf("the topic=%s route info not found",
msg.Topic)
continue
}
+ if lastBrokerName != "" {
+ rlog.Warning("start retrying to send, ",
map[string]interface{}{
+ "lastBroker": lastBrokerName,
+ "newBroker": mq.BrokerName,
+ })
+ }
+
addr :=
p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if addr == "" {
return fmt.Errorf("topic=%s route info not found",
mq.Topic)
@@ -333,6 +360,10 @@ func (p *defaultProducer) sendSync(ctx context.Context,
msg *primitive.Message,
err = _err
continue
}
+
+ if needRetryCode(res.Code) && retryCount < retryTime-1 {
+ continue
+ }
return p.client.ProcessSendResponse(mq.BrokerName, res, resp,
msg)
}
return err
@@ -359,7 +390,7 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f
func(context.Context,
func (p *defaultProducer) sendAsync(ctx context.Context, msg
*primitive.Message, h func(context.Context, *primitive.SendResult, error))
error {
- mq := p.selectMessageQueue(msg)
+ mq := p.selectMessageQueue(msg, "")
if mq == nil {
return errors.Errorf("the topic=%s route info not found",
msg.Topic)
}
@@ -416,8 +447,13 @@ func (p *defaultProducer) sendOneWay(ctx context.Context,
msg *primitive.Message
retryTime := 1 + p.options.RetryTimes
var err error
+ var mq *primitive.MessageQueue
for retryCount := 0; retryCount < retryTime; retryCount++ {
- mq := p.selectMessageQueue(msg)
+ var lastBrokerName string
+ if mq != nil {
+ lastBrokerName = mq.BrokerName
+ }
+ mq = p.selectMessageQueue(msg, lastBrokerName)
if mq == nil {
err = fmt.Errorf("the topic=%s route info not found",
msg.Topic)
continue
@@ -554,13 +590,16 @@ func (p *defaultProducer) tryToFindTopicPublishInfo(topic
string) *internal.Topi
return result
}
-func (p *defaultProducer) selectMessageQueue(msg *primitive.Message)
*primitive.MessageQueue {
- topic := msg.Topic
- result := p.tryToFindTopicPublishInfo(topic)
- if result == nil {
+func (p *defaultProducer) selectMessageQueue(msg *primitive.Message,
lastBrokerName string) *primitive.MessageQueue {
+ result := p.tryToFindTopicPublishInfo(msg.Topic)
+ if result == nil || len(result.MqList) == 0 {
+ rlog.Warning("topic route info is nil or empty",
map[string]interface{}{
+ rlog.LogKeyTopic: msg.Topic,
+ "result": result,
+ })
return nil
}
- return p.options.Selector.Select(msg, result.MqList)
+ return p.options.Selector.Select(msg, result.MqList, lastBrokerName)
}
func (p *defaultProducer) PublishTopicList() []string {
diff --git a/producer/producer_test.go b/producer/producer_test.go
index e1d72dd..cbe7426 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -149,7 +149,11 @@ func TestSync(t *testing.T) {
mockB4Send(p)
- client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil, nil)
+ cmd := &remote.RemotingCommand{
+ Code: internal.ResSuccess,
+ }
+
+ client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return(cmd, nil)
client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any()).Do(
func(brokerName string, cmd *remote.RemotingCommand, resp
*primitive.SendResult, msgs ...*primitive.Message) {
resp.Status = expectedResp.Status
@@ -309,7 +313,11 @@ func TestSyncWithNamespace(t *testing.T) {
mockB4Send(p)
- client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil, nil)
+ cmd := &remote.RemotingCommand{
+ Code: internal.ResSuccess,
+ }
+
+ client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return(cmd, nil)
client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any()).Do(
func(brokerName string, cmd *remote.RemotingCommand, resp
*primitive.SendResult, msgs ...*primitive.Message) {
resp.Status = expectedResp.Status
diff --git a/producer/selector.go b/producer/selector.go
index 74f5bad..c9da49a 100644
--- a/producer/selector.go
+++ b/producer/selector.go
@@ -27,7 +27,7 @@ import (
)
type QueueSelector interface {
- Select(*primitive.Message, []*primitive.MessageQueue)
*primitive.MessageQueue
+ Select(msg *primitive.Message, mqs []*primitive.MessageQueue,
lastBrokerName string) *primitive.MessageQueue
}
// manualQueueSelector use the queue manually set in the provided Message's
QueueID field as the queue to send.
@@ -37,7 +37,7 @@ func NewManualQueueSelector() QueueSelector {
return new(manualQueueSelector)
}
-func (manualQueueSelector) Select(message *primitive.Message, queues
[]*primitive.MessageQueue) *primitive.MessageQueue {
+func (manualQueueSelector) Select(message *primitive.Message, queues
[]*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
return message.Queue
}
@@ -53,7 +53,7 @@ func NewRandomQueueSelector() QueueSelector {
return s
}
-func (r *randomQueueSelector) Select(message *primitive.Message, queues
[]*primitive.MessageQueue) *primitive.MessageQueue {
+func (r *randomQueueSelector) Select(message *primitive.Message, queues
[]*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
r.mux.Lock()
i := r.rander.Intn(len(queues))
r.mux.Unlock()
@@ -74,11 +74,32 @@ func NewRoundRobinQueueSelector() QueueSelector {
return s
}
-func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues
[]*primitive.MessageQueue) *primitive.MessageQueue {
+func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues
[]*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
t := message.Topic
- var idx *uint32
r.Lock()
+ defer r.Unlock()
+ if lastBrokerName != "" {
+ for i := 0; i < len(queues); i++ {
+ idx, exist := r.indexer[t]
+ if !exist {
+ var v uint32 = 0
+ idx = &v
+ r.indexer[t] = idx
+ }
+ *idx++
+ qIndex := *idx % uint32(len(queues))
+ if queues[qIndex].BrokerName != lastBrokerName {
+ return queues[qIndex]
+ }
+ }
+ }
+ return r.selectOneMessageQueue(t, queues)
+}
+
+func (r *roundRobinQueueSelector) selectOneMessageQueue(t string, queues
[]*primitive.MessageQueue) *primitive.MessageQueue {
+ var idx *uint32
+
idx, exist := r.indexer[t]
if !exist {
var v uint32 = 0
@@ -86,7 +107,6 @@ func (r *roundRobinQueueSelector) Select(message
*primitive.Message, queues []*p
r.indexer[t] = idx
}
*idx++
- r.Unlock()
qIndex := *idx % uint32(len(queues))
return queues[qIndex]
@@ -103,10 +123,10 @@ func NewHashQueueSelector() QueueSelector {
}
// hashQueueSelector choose the queue by hash if message having sharding key,
otherwise choose queue by random instead.
-func (h *hashQueueSelector) Select(message *primitive.Message, queues
[]*primitive.MessageQueue) *primitive.MessageQueue {
+func (h *hashQueueSelector) Select(message *primitive.Message, queues
[]*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
key := message.GetShardingKey()
if len(key) == 0 {
- return h.random.Select(message, queues)
+ return h.random.Select(message, queues, lastBrokerName)
}
hasher := fnv.New32a()
diff --git a/producer/selector_test.go b/producer/selector_test.go
index 72dc469..e0b4384 100644
--- a/producer/selector_test.go
+++ b/producer/selector_test.go
@@ -26,7 +26,7 @@ import (
)
func TestRoundRobin(t *testing.T) {
- queues := make([]*primitive.MessageQueue, 10)
+ queues := make([]*primitive.MessageQueue, 0)
for i := 0; i < 10; i++ {
queues = append(queues, &primitive.MessageQueue{
QueueId: i,
@@ -41,18 +41,43 @@ func TestRoundRobin(t *testing.T) {
Topic: "rr",
}
for i := 0; i < 100; i++ {
- q := s.Select(m, queues)
+ q := s.Select(m, queues, "")
expected := (i + 1) % len(queues)
assert.Equal(t, queues[expected], q, "i: %d", i)
- qrr := s.Select(mrr, queues)
+ qrr := s.Select(mrr, queues, "")
expected = (i + 1) % len(queues)
assert.Equal(t, queues[expected], qrr, "i: %d", i)
}
}
+func TestRoundRobinRetry(t *testing.T) {
+ queues := make([]*primitive.MessageQueue, 0)
+ brokerA := "brokerA"
+ brokerB := "brokerB"
+ for i := 0; i < 5; i++ {
+ queues = append(queues, &primitive.MessageQueue{
+ QueueId: i,
+ BrokerName: brokerA,
+ })
+ queues = append(queues, &primitive.MessageQueue{
+ QueueId: i,
+ BrokerName: brokerB,
+ })
+ }
+ s := NewRoundRobinQueueSelector()
+
+ m := &primitive.Message{
+ Topic: "test",
+ }
+ for i := 0; i < 100; i++ {
+ q := s.Select(m, queues, brokerA)
+ assert.Equal(t, brokerB, q.BrokerName)
+ }
+}
+
func TestHashQueueSelector(t *testing.T) {
- queues := make([]*primitive.MessageQueue, 10)
+ queues := make([]*primitive.MessageQueue, 0)
for i := 0; i < 10; i++ {
queues = append(queues, &primitive.MessageQueue{
QueueId: i,
@@ -66,13 +91,13 @@ func TestHashQueueSelector(t *testing.T) {
Body: []byte("one message"),
}
m1.WithShardingKey("same_key")
- q1 := s.Select(m1, queues)
+ q1 := s.Select(m1, queues, "")
m2 := &primitive.Message{
Topic: "test",
Body: []byte("another message"),
}
m2.WithShardingKey("same_key")
- q2 := s.Select(m2, queues)
+ q2 := s.Select(m2, queues, "")
assert.Equal(t, *q1, *q2)
}