This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 80d9c07e [ISSUE #679] Fix judgment of topic route equality and 
optimize loadBalancer (#680)
80d9c07e is described below

commit 80d9c07e3df703936467bcfc6a25f5c6a9a519a5
Author: Liu Shengzhong <szliu0...@gmail.com>
AuthorDate: Mon Feb 26 10:36:57 2024 +0800

    [ISSUE #679] Fix judgment of topic route equality and optimize loadBalancer 
(#680)
    
    * fix(golang): correct judgment of topic route equality and optimize load 
balancer
    
    * fix(golang): add tests for routeEqual
    
    * refactor
---
 golang/client.go       | 46 ++++++++++++++++++++++++++++++++++++----------
 golang/client_test.go  | 46 ++++++++++++++++++++++++++++++++++++++++++++++
 golang/loadBalancer.go | 16 ++++++++++++++++
 3 files changed, 98 insertions(+), 10 deletions(-)

diff --git a/golang/client.go b/golang/client.go
index 45e4b549..71c48198 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -24,7 +24,6 @@ import (
        "encoding/hex"
        "errors"
        "fmt"
-       "reflect"
        "sync"
        "time"
 
@@ -32,6 +31,7 @@ import (
        "github.com/apache/rocketmq-clients/golang/v5/pkg/ticker"
        "github.com/apache/rocketmq-clients/golang/v5/pkg/utils"
        v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2"
+       "github.com/golang/protobuf/proto"
        "github.com/google/uuid"
        "go.uber.org/atomic"
        "go.uber.org/zap"
@@ -504,27 +504,40 @@ func (cli *defaultClient) startUp() error {
        f := func() {
                cli.router.Range(func(k, v interface{}) bool {
                        topic := k.(string)
-                       oldRoute := v
                        newRoute, err := cli.queryRoute(context.TODO(), topic, 
cli.opts.timeout)
                        if err != nil {
                                cli.log.Errorf("scheduled queryRoute err=%v", 
err)
                        }
-                       if newRoute == nil && oldRoute != nil {
+                       if newRoute == nil && v != nil {
                                cli.log.Info("newRoute is nil, but oldRoute is 
not. do not update")
                                return true
                        }
-                       if !reflect.DeepEqual(newRoute, oldRoute) {
+                       var oldRoute []*v2.MessageQueue
+                       if v != nil {
+                               oldRoute = v.([]*v2.MessageQueue)
+                       }
+                       if !routeEqual(oldRoute, newRoute) {
                                cli.router.Store(k, newRoute)
                                switch impl := cli.clientImpl.(type) {
                                case *defaultProducer:
-                                       plb, err := 
NewPublishingLoadBalancer(newRoute)
-                                       if err == nil {
-                                               
impl.publishingRouteDataResultCache.Store(topic, plb)
+                                       existing, ok := 
impl.publishingRouteDataResultCache.Load(topic)
+                                       if !ok {
+                                               plb, err := 
NewPublishingLoadBalancer(newRoute)
+                                               if err == nil {
+                                                       
impl.publishingRouteDataResultCache.Store(topic, plb)
+                                               }
+                                       } else {
+                                               
impl.publishingRouteDataResultCache.Store(topic, 
existing.(PublishingLoadBalancer).CopyAndUpdate(newRoute))
                                        }
                                case *defaultSimpleConsumer:
-                                       slb, err := 
NewSubscriptionLoadBalancer(newRoute)
-                                       if err == nil {
-                                               
impl.subTopicRouteDataResultCache.Store(topic, slb)
+                                       existing, ok := 
impl.subTopicRouteDataResultCache.Load(topic)
+                                       if !ok {
+                                               slb, err := 
NewSubscriptionLoadBalancer(newRoute)
+                                               if err == nil {
+                                                       
impl.subTopicRouteDataResultCache.Store(topic, slb)
+                                               }
+                                       } else {
+                                               
impl.subTopicRouteDataResultCache.Store(topic, 
existing.(SubscriptionLoadBalancer).CopyAndUpdate(newRoute))
                                        }
                                }
                        }
@@ -534,6 +547,19 @@ func (cli *defaultClient) startUp() error {
        ticker.Tick(f, time.Second*30, cli.done)
        return nil
 }
+
+func routeEqual(old, new []*v2.MessageQueue) bool {
+       if len(old) != len(new) {
+               return false
+       }
+       for i := 0; i < len(old); i++ {
+               if !proto.Equal(old[i], new[i]) {
+                       return false
+               }
+       }
+       return true
+}
+
 func (cli *defaultClient) notifyClientTermination() {
        cli.log.Info("start notifyClientTermination")
        ctx := cli.Sign(context.Background())
diff --git a/golang/client_test.go b/golang/client_test.go
index b46d2d73..4549bdfe 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -20,6 +20,7 @@ package golang
 import (
        "context"
        "fmt"
+       "reflect"
        "testing"
        "time"
 
@@ -293,3 +294,48 @@ func TestRestoreDefaultClientSessionTwoErrors(t 
*testing.T) {
        assert.Equal(t, "Encountered error while receiving TelemetryCommand, 
trying to recover", commandExecutionLog[0].Message)
        assert.Equal(t, "Failed to recover, err=EOF", 
commandExecutionLog[1].Message)
 }
+
+func Test_routeEqual(t *testing.T) {
+       oldMq := &v2.MessageQueue{
+               Topic: &v2.Resource{
+                       Name: "topic-test",
+               },
+               Id:         0,
+               Permission: v2.Permission_READ_WRITE,
+               Broker: &v2.Broker{
+                       Name:      "broker-test",
+                       Id:        0,
+                       Endpoints: fakeEndpoints(),
+               },
+               AcceptMessageTypes: []v2.MessageType{
+                       v2.MessageType_NORMAL,
+               },
+       }
+       newMq := &v2.MessageQueue{
+               Topic: &v2.Resource{
+                       Name: "topic-test",
+               },
+               Id:         0,
+               Permission: v2.Permission_READ_WRITE,
+               Broker: &v2.Broker{
+                       Name:      "broker-test",
+                       Id:        0,
+                       Endpoints: fakeEndpoints(),
+               },
+               AcceptMessageTypes: []v2.MessageType{
+                       v2.MessageType_NORMAL,
+               },
+       }
+
+       newMq.ProtoReflect() // message internal field value will be changed
+
+       oldRoute := []*v2.MessageQueue{oldMq}
+       newRoute := []*v2.MessageQueue{newMq}
+
+       assert.Equal(t, false, reflect.DeepEqual(oldRoute, newRoute))
+       assert.Equal(t, true, routeEqual(oldRoute, newRoute))
+       assert.Equal(t, true, routeEqual(nil, nil))
+       assert.Equal(t, false, routeEqual(nil, newRoute))
+       assert.Equal(t, false, routeEqual(oldRoute, nil))
+       assert.Equal(t, true, routeEqual(nil, []*v2.MessageQueue{}))
+}
diff --git a/golang/loadBalancer.go b/golang/loadBalancer.go
index db9cbd44..da2dd647 100644
--- a/golang/loadBalancer.go
+++ b/golang/loadBalancer.go
@@ -31,6 +31,7 @@ import (
 type PublishingLoadBalancer interface {
        TakeMessageQueueByMessageGroup(messageGroup *string) 
([]*v2.MessageQueue, error)
        TakeMessageQueues(excluded sync.Map, count int) ([]*v2.MessageQueue, 
error)
+       CopyAndUpdate([]*v2.MessageQueue) PublishingLoadBalancer
 }
 
 type publishingLoadBalancer struct {
@@ -119,8 +120,16 @@ func (plb *publishingLoadBalancer) 
TakeMessageQueues(excluded sync.Map, count in
        return candidates, nil
 }
 
+func (plb *publishingLoadBalancer) CopyAndUpdate(messageQueues 
[]*v2.MessageQueue) PublishingLoadBalancer {
+       return &publishingLoadBalancer{
+               messageQueues: messageQueues,
+               index:         plb.index,
+       }
+}
+
 type SubscriptionLoadBalancer interface {
        TakeMessageQueue() (*v2.MessageQueue, error)
+       CopyAndUpdate([]*v2.MessageQueue) SubscriptionLoadBalancer
 }
 
 type subscriptionLoadBalancer struct {
@@ -147,3 +156,10 @@ func (slb *subscriptionLoadBalancer) TakeMessageQueue() 
(*v2.MessageQueue, error
        selectMessageQueue := slb.messageQueues[idx]
        return selectMessageQueue, nil
 }
+
+func (slb *subscriptionLoadBalancer) CopyAndUpdate(messageQueues 
[]*v2.MessageQueue) SubscriptionLoadBalancer {
+       return &subscriptionLoadBalancer{
+               messageQueues: messageQueues,
+               index:         slb.index,
+       }
+}

Reply via email to