This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d86ea6  support RequestCode GET_CONSUMER_STATUS_FROM_CLIENT (#985)
1d86ea6 is described below

commit 1d86ea68616504c6985586d2a9bd1d47c312246f
Author: Kay Du <kay.du0...@gmail.com>
AuthorDate: Sat May 6 16:30:45 2023 +0800

    support RequestCode GET_CONSUMER_STATUS_FROM_CLIENT (#985)
    
    Co-authored-by: 筱瑜 <maoyu....@alibaba-inc.com>
---
 consumer/mock_offset_store.go | 14 +++++++++++++
 consumer/offset_store.go      | 24 ++++++++++++++++++++++
 consumer/pull_consumer.go     |  9 ++++++++
 consumer/push_consumer.go     |  9 ++++++++
 internal/client.go            | 36 ++++++++++++++++++++++++++++++++
 internal/model.go             | 48 +++++++++++++++++++++++++++++++++++++++++++
 internal/request.go           | 33 +++++++++++++++++++++++++++++
 7 files changed, 173 insertions(+)

diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
index 145ec3d..1d7c3bd 100644
--- a/consumer/mock_offset_store.go
+++ b/consumer/mock_offset_store.go
@@ -115,3 +115,17 @@ func (mr *MockOffsetStoreMockRecorder) update(mq, offset, 
increaseOnly interface
        mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "update", 
reflect.TypeOf((*MockOffsetStore)(nil).update), mq, offset, increaseOnly)
 }
+
+// getMQOffsetMap mocks base method
+func (m *MockOffsetStore) getMQOffsetMap(topic string) 
map[primitive.MessageQueue]int64  {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "getMQOffsetMap", topic)
+       ret0, _ := ret[0].(map[primitive.MessageQueue]int64)
+       return ret0
+}
+
+// getMQOffsetMap indicates an expected call of getMQOffsetMap
+func (mr *MockOffsetStoreMockRecorder) getMQOffsetMap(topic string) 
*gomock.Call{
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getMQOffsetMap", 
reflect.TypeOf((*MockOffsetStore)(nil).getMQOffsetMap), topic)
+}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 0fa748c..b0f9da9 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -59,6 +59,7 @@ type OffsetStore interface {
        read(mq *primitive.MessageQueue, t readType) int64
        readWithException(mq *primitive.MessageQueue, t readType) (int64, error)
        update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)
+       getMQOffsetMap(topic string) map[primitive.MessageQueue]int64
 }
 
 type OffsetSerializeWrapper struct {
@@ -228,6 +229,18 @@ func (local *localFileOffsetStore) remove(mq 
*primitive.MessageQueue) {
        // nothing to do
 }
 
+func (local *localFileOffsetStore) getMQOffsetMap(topic string) 
map[primitive.MessageQueue]int64 {
+       copyOffsetTable := make(map[primitive.MessageQueue]int64)
+       local.OffsetTable.Range(func(key, value interface{}) bool {
+               if key.(MessageQueueKey).Topic != topic {
+                       return true
+               }
+               copyOffsetTable[primitive.MessageQueue(key.(MessageQueueKey))] 
= value.(int64)
+               return true
+       })
+       return copyOffsetTable
+}
+
 type remoteBrokerOffsetStore struct {
        group       string
        OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"`
@@ -353,6 +366,17 @@ func (r *remoteBrokerOffsetStore) update(mq 
*primitive.MessageQueue, offset int6
        }
 }
 
+func (r *remoteBrokerOffsetStore) getMQOffsetMap(topic string) 
map[primitive.MessageQueue]int64 {
+       copyOffsetTable := make(map[primitive.MessageQueue]int64)
+       for mq, offset := range r.OffsetTable {
+               if mq.Topic != topic {
+                       continue
+               }
+               copyOffsetTable[mq] = offset
+       }
+       return copyOffsetTable
+}
+
 func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, 
mq *primitive.MessageQueue) (int64, error) {
        broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)
        if broker == "" {
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index ef165b0..6730f07 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -853,6 +853,15 @@ func (pc *defaultPullConsumer) 
consumeMessageConcurrently(pq *processQueue, mq *
        }
 }
 
+func (pc *defaultPullConsumer) GetConsumerStatus(topic string) 
*internal.ConsumerStatus {
+       consumerStatus := internal.NewConsumerStatus()
+       mqOffsetMap := pc.storage.getMQOffsetMap(topic)
+       if mqOffsetMap != nil {
+               consumerStatus.MQOffsetMap = mqOffsetMap
+       }
+       return consumerStatus
+}
+
 func (pc *defaultPullConsumer) validate() error {
        if err := internal.ValidateGroup(pc.consumerGroup); err != nil {
                return err
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d8706b0..238ab03 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -430,6 +430,15 @@ func (pc *pushConsumer) ConsumeMessageDirectly(msg 
*primitive.MessageExt, broker
        return res
 }
 
+func (pc *pushConsumer) GetConsumerStatus(topic string) 
*internal.ConsumerStatus {
+       consumerStatus := internal.NewConsumerStatus()
+       mqOffsetMap := pc.storage.getMQOffsetMap(topic)
+       if mqOffsetMap != nil {
+               consumerStatus.MQOffsetMap = mqOffsetMap
+       }
+       return consumerStatus
+}
+
 func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) 
*internal.ConsumerRunningInfo {
        info := internal.NewConsumerRunningInfo()
 
diff --git a/internal/client.go b/internal/client.go
index 486dffd..d479b0c 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -92,6 +92,7 @@ type InnerConsumer interface {
        GetModel() string
        GetWhere() string
        ResetOffset(topic string, table map[primitive.MessageQueue]int64)
+       GetConsumerStatus(topic string) *ConsumerStatus
 }
 
 func DefaultClientOptions() ClientOptions {
@@ -377,6 +378,32 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) R
                        res.Code = ResSuccess
                        return res
                })
+
+               
client.remoteClient.RegisterRequestFunc(ReqGetConsumerStatsFromClient, func(req 
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
+                       rlog.Info("receive get consumer status from client 
request...", map[string]interface{}{
+                               rlog.LogKeyBroker:        addr.String(),
+                               rlog.LogKeyTopic:         
req.ExtFields["topic"],
+                               rlog.LogKeyConsumerGroup: 
req.ExtFields["group"],
+                       })
+
+                       header := new(GetConsumerStatusRequestHeader)
+                       header.Decode(req.ExtFields)
+                       res := remote.NewRemotingCommand(ResError, nil, nil)
+
+                       consumerStatus := 
client.getConsumerStatus(header.topic, header.group)
+                       if consumerStatus != nil {
+                               res.Code = ResSuccess
+                               data, err := consumerStatus.Encode()
+                               if err != nil {
+                                       res.Remark = fmt.Sprintf("Failed to 
encode consumer status: %s", err.Error())
+                               } else {
+                                       res.Body = data
+                               }
+                       } else {
+                               res.Remark = "there is unexpected error when 
get consumer status, please check log"
+                       }
+                       return res
+               })
        }
        return client
 }
@@ -905,6 +932,15 @@ func (c *rmqClient) resetOffset(topic string, group 
string, offsetTable map[prim
        consumer.(InnerConsumer).ResetOffset(topic, offsetTable)
 }
 
+func (c *rmqClient) getConsumerStatus(topic string, group string) 
*ConsumerStatus {
+       consumer, exist := c.consumerMap.Load(group)
+       if !exist {
+               rlog.Warning("group "+group+" do not exists", nil)
+               return nil
+       }
+       return consumer.(InnerConsumer).GetConsumerStatus(topic)
+}
+
 func (c *rmqClient) getConsumerRunningInfo(group string, stack bool) 
*ConsumerRunningInfo {
        consumer, exist := c.consumerMap.Load(group)
        if !exist {
diff --git a/internal/model.go b/internal/model.go
index f058925..36e06a1 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -295,6 +295,54 @@ func NewConsumerRunningInfo() *ConsumerRunningInfo {
        }
 }
 
+type ConsumerStatus struct {
+       MQOffsetMap map[primitive.MessageQueue]int64
+}
+
+func (status ConsumerStatus) Encode() ([]byte, error) {
+       mapJson := ""
+       keys := make([]primitive.MessageQueue, 0)
+
+       for k := range status.MQOffsetMap {
+               keys = append(keys, k)
+       }
+
+       sort.Slice(keys, func(i, j int) bool {
+               q1 := keys[i]
+               q2 := keys[j]
+               com := strings.Compare(q1.Topic, q2.Topic)
+               if com != 0 {
+                       return com < 0
+               }
+
+               com = strings.Compare(q1.BrokerName, q2.BrokerName)
+               if com != 0 {
+                       return com < 0
+               }
+
+               return q1.QueueId < q2.QueueId
+       })
+
+       for idx := range keys {
+               dataK, err := json.Marshal(keys[idx])
+               if err != nil {
+                       return nil, err
+               }
+               dataV, err := json.Marshal(status.MQOffsetMap[keys[idx]])
+               mapJson = fmt.Sprintf("%s,%s:%s", mapJson, string(dataK), 
string(dataV))
+       }
+       mapJson = strings.TrimLeft(mapJson, ",")
+       jsonData := fmt.Sprintf("{\"%s\":%s}",
+               "messageQueueTable", fmt.Sprintf("{%s}", mapJson))
+       return []byte(jsonData), nil
+}
+
+func NewConsumerStatus() *ConsumerStatus {
+       return &ConsumerStatus{
+               MQOffsetMap: make(map[primitive.MessageQueue]int64),
+       }
+}
+
 type ConsumeMessageDirectlyResult struct {
        Order          bool          `json:"order"`
        AutoCommit     bool          `json:"autoCommit"`
diff --git a/internal/request.go b/internal/request.go
index 9a590b8..7e86b50 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -50,6 +50,7 @@ const (
        ReqDeleteTopicInBroker           = int16(215)
        ReqDeleteTopicInNameSrv          = int16(216)
        ReqResetConsumerOffset           = int16(220)
+       ReqGetConsumerStatsFromClient    = int16(221)
        ReqGetConsumerRunningInfo        = int16(307)
        ReqConsumeMessageDirectly        = int16(309)
        ReqSendReplyMessage              = int16(324)
@@ -490,6 +491,38 @@ func (request *ConsumeMessageDirectlyHeader) 
Decode(properties map[string]string
        }
 }
 
+type GetConsumerStatusRequestHeader struct {
+       topic      string
+       group      string
+       clientAddr string
+}
+
+func (request *GetConsumerStatusRequestHeader) Encode() map[string]string {
+       return map[string]string{
+               "topic":      request.topic,
+               "group":      request.group,
+               "clientAddr": request.clientAddr,
+       }
+}
+
+func (request *GetConsumerStatusRequestHeader) Decode(properties 
map[string]string) {
+       if len(properties) == 0 {
+               return
+       }
+
+       if v, existed := properties["topic"]; existed {
+               request.topic = v
+       }
+
+       if v, existed := properties["group"]; existed {
+               request.group = v
+       }
+
+       if v, existed := properties["clientAddr"]; existed {
+               request.clientAddr = v
+       }
+}
+
 type ReplyMessageRequestHeader struct {
        producerGroup         string
        topic                 string

Reply via email to