This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 80d9c07e [ISSUE #679] Fix judgment of topic route equality and optimize loadBalancer (#680) 80d9c07e is described below commit 80d9c07e3df703936467bcfc6a25f5c6a9a519a5 Author: Liu Shengzhong <szliu0...@gmail.com> AuthorDate: Mon Feb 26 10:36:57 2024 +0800 [ISSUE #679] Fix judgment of topic route equality and optimize loadBalancer (#680) * fix(golang): correct judgment of topic route equality and optimize load balancer * fix(golang): add tests for routeEqual * refactor --- golang/client.go | 46 ++++++++++++++++++++++++++++++++++++---------- golang/client_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ golang/loadBalancer.go | 16 ++++++++++++++++ 3 files changed, 98 insertions(+), 10 deletions(-) diff --git a/golang/client.go b/golang/client.go index 45e4b549..71c48198 100644 --- a/golang/client.go +++ b/golang/client.go @@ -24,7 +24,6 @@ import ( "encoding/hex" "errors" "fmt" - "reflect" "sync" "time" @@ -32,6 +31,7 @@ import ( "github.com/apache/rocketmq-clients/golang/v5/pkg/ticker" "github.com/apache/rocketmq-clients/golang/v5/pkg/utils" v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2" + "github.com/golang/protobuf/proto" "github.com/google/uuid" "go.uber.org/atomic" "go.uber.org/zap" @@ -504,27 +504,40 @@ func (cli *defaultClient) startUp() error { f := func() { cli.router.Range(func(k, v interface{}) bool { topic := k.(string) - oldRoute := v newRoute, err := cli.queryRoute(context.TODO(), topic, cli.opts.timeout) if err != nil { cli.log.Errorf("scheduled queryRoute err=%v", err) } - if newRoute == nil && oldRoute != nil { + if newRoute == nil && v != nil { cli.log.Info("newRoute is nil, but oldRoute is not. do not update") return true } - if !reflect.DeepEqual(newRoute, oldRoute) { + var oldRoute []*v2.MessageQueue + if v != nil { + oldRoute = v.([]*v2.MessageQueue) + } + if !routeEqual(oldRoute, newRoute) { cli.router.Store(k, newRoute) switch impl := cli.clientImpl.(type) { case *defaultProducer: - plb, err := NewPublishingLoadBalancer(newRoute) - if err == nil { - impl.publishingRouteDataResultCache.Store(topic, plb) + existing, ok := impl.publishingRouteDataResultCache.Load(topic) + if !ok { + plb, err := NewPublishingLoadBalancer(newRoute) + if err == nil { + impl.publishingRouteDataResultCache.Store(topic, plb) + } + } else { + impl.publishingRouteDataResultCache.Store(topic, existing.(PublishingLoadBalancer).CopyAndUpdate(newRoute)) } case *defaultSimpleConsumer: - slb, err := NewSubscriptionLoadBalancer(newRoute) - if err == nil { - impl.subTopicRouteDataResultCache.Store(topic, slb) + existing, ok := impl.subTopicRouteDataResultCache.Load(topic) + if !ok { + slb, err := NewSubscriptionLoadBalancer(newRoute) + if err == nil { + impl.subTopicRouteDataResultCache.Store(topic, slb) + } + } else { + impl.subTopicRouteDataResultCache.Store(topic, existing.(SubscriptionLoadBalancer).CopyAndUpdate(newRoute)) } } } @@ -534,6 +547,19 @@ func (cli *defaultClient) startUp() error { ticker.Tick(f, time.Second*30, cli.done) return nil } + +func routeEqual(old, new []*v2.MessageQueue) bool { + if len(old) != len(new) { + return false + } + for i := 0; i < len(old); i++ { + if !proto.Equal(old[i], new[i]) { + return false + } + } + return true +} + func (cli *defaultClient) notifyClientTermination() { cli.log.Info("start notifyClientTermination") ctx := cli.Sign(context.Background()) diff --git a/golang/client_test.go b/golang/client_test.go index b46d2d73..4549bdfe 100644 --- a/golang/client_test.go +++ b/golang/client_test.go @@ -20,6 +20,7 @@ package golang import ( "context" "fmt" + "reflect" "testing" "time" @@ -293,3 +294,48 @@ func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) { assert.Equal(t, "Encountered error while receiving TelemetryCommand, trying to recover", commandExecutionLog[0].Message) assert.Equal(t, "Failed to recover, err=EOF", commandExecutionLog[1].Message) } + +func Test_routeEqual(t *testing.T) { + oldMq := &v2.MessageQueue{ + Topic: &v2.Resource{ + Name: "topic-test", + }, + Id: 0, + Permission: v2.Permission_READ_WRITE, + Broker: &v2.Broker{ + Name: "broker-test", + Id: 0, + Endpoints: fakeEndpoints(), + }, + AcceptMessageTypes: []v2.MessageType{ + v2.MessageType_NORMAL, + }, + } + newMq := &v2.MessageQueue{ + Topic: &v2.Resource{ + Name: "topic-test", + }, + Id: 0, + Permission: v2.Permission_READ_WRITE, + Broker: &v2.Broker{ + Name: "broker-test", + Id: 0, + Endpoints: fakeEndpoints(), + }, + AcceptMessageTypes: []v2.MessageType{ + v2.MessageType_NORMAL, + }, + } + + newMq.ProtoReflect() // message internal field value will be changed + + oldRoute := []*v2.MessageQueue{oldMq} + newRoute := []*v2.MessageQueue{newMq} + + assert.Equal(t, false, reflect.DeepEqual(oldRoute, newRoute)) + assert.Equal(t, true, routeEqual(oldRoute, newRoute)) + assert.Equal(t, true, routeEqual(nil, nil)) + assert.Equal(t, false, routeEqual(nil, newRoute)) + assert.Equal(t, false, routeEqual(oldRoute, nil)) + assert.Equal(t, true, routeEqual(nil, []*v2.MessageQueue{})) +} diff --git a/golang/loadBalancer.go b/golang/loadBalancer.go index db9cbd44..da2dd647 100644 --- a/golang/loadBalancer.go +++ b/golang/loadBalancer.go @@ -31,6 +31,7 @@ import ( type PublishingLoadBalancer interface { TakeMessageQueueByMessageGroup(messageGroup *string) ([]*v2.MessageQueue, error) TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, error) + CopyAndUpdate([]*v2.MessageQueue) PublishingLoadBalancer } type publishingLoadBalancer struct { @@ -119,8 +120,16 @@ func (plb *publishingLoadBalancer) TakeMessageQueues(excluded sync.Map, count in return candidates, nil } +func (plb *publishingLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) PublishingLoadBalancer { + return &publishingLoadBalancer{ + messageQueues: messageQueues, + index: plb.index, + } +} + type SubscriptionLoadBalancer interface { TakeMessageQueue() (*v2.MessageQueue, error) + CopyAndUpdate([]*v2.MessageQueue) SubscriptionLoadBalancer } type subscriptionLoadBalancer struct { @@ -147,3 +156,10 @@ func (slb *subscriptionLoadBalancer) TakeMessageQueue() (*v2.MessageQueue, error selectMessageQueue := slb.messageQueues[idx] return selectMessageQueue, nil } + +func (slb *subscriptionLoadBalancer) CopyAndUpdate(messageQueues []*v2.MessageQueue) SubscriptionLoadBalancer { + return &subscriptionLoadBalancer{ + messageQueues: messageQueues, + index: slb.index, + } +}