This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f56a2db  fix: data race when update subversion for consumer (#962)
f56a2db is described below

commit f56a2dba2af8ab0b88869e589b0820f49b728f8c
Author: cserwen <[email protected]>
AuthorDate: Fri Dec 2 11:50:48 2022 +0800

    fix: data race when update subversion for consumer (#962)
    
    Co-authored-by: dengzhiwen1 <[email protected]>
---
 consumer/consumer.go  |  2 +-
 internal/model.go     | 26 ++++++++++++++++++++++++++
 internal/utils/set.go |  6 +++++-
 3 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 15100cd..1511f01 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -435,7 +435,7 @@ func (dc *defaultConsumer) doBalance() {
 func (dc *defaultConsumer) SubscriptionDataList() []*internal.SubscriptionData 
{
        result := make([]*internal.SubscriptionData, 0)
        dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
-               result = append(result, value.(*internal.SubscriptionData))
+               result = append(result, 
value.(*internal.SubscriptionData).Clone())
                return true
        })
        return result
diff --git a/internal/model.go b/internal/model.go
index c248bb3..0fd7955 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -62,6 +62,32 @@ type SubscriptionData struct {
        ExpType         string    `json:"expressionType"`
 }
 
+func (sd *SubscriptionData) Clone() *SubscriptionData {
+       cloned := &SubscriptionData{
+               ClassFilterMode: sd.ClassFilterMode,
+               Topic:           sd.Topic,
+               SubString:       sd.SubString,
+               SubVersion:      sd.SubVersion,
+               ExpType:         sd.ExpType,
+       }
+
+       if sd.Tags.Items() != nil {
+               cloned.Tags = utils.NewSet()
+               for _, value := range sd.Tags.Items() {
+                       cloned.Tags.Add(value)
+               }
+       }
+
+       if sd.Codes.Items() != nil {
+               cloned.Codes = utils.NewSet()
+               for _, value := range sd.Codes.Items() {
+                       cloned.Codes.Add(value)
+               }
+       }
+
+       return cloned
+}
+
 type producerData struct {
        GroupName string `json:"groupName"`
 }
diff --git a/internal/utils/set.go b/internal/utils/set.go
index e90fb36..ed9857b 100644
--- a/internal/utils/set.go
+++ b/internal/utils/set.go
@@ -43,6 +43,10 @@ func NewSet() Set {
        }
 }
 
+func (s *Set) Items() map[string]UniqueItem {
+       return s.items
+}
+
 func (s *Set) Add(v UniqueItem) {
        s.items[v.UniqueID()] = v
 }
@@ -98,6 +102,6 @@ func (s *Set) MarshalJSON() ([]byte, error) {
        return buffer.Bytes(), nil
 }
 
-func (s Set) UnmarshalJSON(data []byte) (err error) {
+func (s *Set) UnmarshalJSON(data []byte) (err error) {
        return nil
 }

Reply via email to