This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 1d86ea6 support RequestCode GET_CONSUMER_STATUS_FROM_CLIENT (#985) 1d86ea6 is described below commit 1d86ea68616504c6985586d2a9bd1d47c312246f Author: Kay Du <kay.du0...@gmail.com> AuthorDate: Sat May 6 16:30:45 2023 +0800 support RequestCode GET_CONSUMER_STATUS_FROM_CLIENT (#985) Co-authored-by: 筱瑜 <maoyu....@alibaba-inc.com> --- consumer/mock_offset_store.go | 14 +++++++++++++ consumer/offset_store.go | 24 ++++++++++++++++++++++ consumer/pull_consumer.go | 9 ++++++++ consumer/push_consumer.go | 9 ++++++++ internal/client.go | 36 ++++++++++++++++++++++++++++++++ internal/model.go | 48 +++++++++++++++++++++++++++++++++++++++++++ internal/request.go | 33 +++++++++++++++++++++++++++++ 7 files changed, 173 insertions(+) diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go index 145ec3d..1d7c3bd 100644 --- a/consumer/mock_offset_store.go +++ b/consumer/mock_offset_store.go @@ -115,3 +115,17 @@ func (mr *MockOffsetStoreMockRecorder) update(mq, offset, increaseOnly interface mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "update", reflect.TypeOf((*MockOffsetStore)(nil).update), mq, offset, increaseOnly) } + +// getMQOffsetMap mocks base method +func (m *MockOffsetStore) getMQOffsetMap(topic string) map[primitive.MessageQueue]int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "getMQOffsetMap", topic) + ret0, _ := ret[0].(map[primitive.MessageQueue]int64) + return ret0 +} + +// getMQOffsetMap indicates an expected call of getMQOffsetMap +func (mr *MockOffsetStoreMockRecorder) getMQOffsetMap(topic string) *gomock.Call{ + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getMQOffsetMap", reflect.TypeOf((*MockOffsetStore)(nil).getMQOffsetMap), topic) +} diff --git a/consumer/offset_store.go b/consumer/offset_store.go index 0fa748c..b0f9da9 100644 --- a/consumer/offset_store.go +++ b/consumer/offset_store.go @@ -59,6 +59,7 @@ type OffsetStore interface { read(mq *primitive.MessageQueue, t readType) int64 readWithException(mq *primitive.MessageQueue, t readType) (int64, error) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) + getMQOffsetMap(topic string) map[primitive.MessageQueue]int64 } type OffsetSerializeWrapper struct { @@ -228,6 +229,18 @@ func (local *localFileOffsetStore) remove(mq *primitive.MessageQueue) { // nothing to do } +func (local *localFileOffsetStore) getMQOffsetMap(topic string) map[primitive.MessageQueue]int64 { + copyOffsetTable := make(map[primitive.MessageQueue]int64) + local.OffsetTable.Range(func(key, value interface{}) bool { + if key.(MessageQueueKey).Topic != topic { + return true + } + copyOffsetTable[primitive.MessageQueue(key.(MessageQueueKey))] = value.(int64) + return true + }) + return copyOffsetTable +} + type remoteBrokerOffsetStore struct { group string OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"` @@ -353,6 +366,17 @@ func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int6 } } +func (r *remoteBrokerOffsetStore) getMQOffsetMap(topic string) map[primitive.MessageQueue]int64 { + copyOffsetTable := make(map[primitive.MessageQueue]int64) + for mq, offset := range r.OffsetTable { + if mq.Topic != topic { + continue + } + copyOffsetTable[mq] = offset + } + return copyOffsetTable +} + func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) { broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName) if broker == "" { diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index ef165b0..6730f07 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -853,6 +853,15 @@ func (pc *defaultPullConsumer) consumeMessageConcurrently(pq *processQueue, mq * } } +func (pc *defaultPullConsumer) GetConsumerStatus(topic string) *internal.ConsumerStatus { + consumerStatus := internal.NewConsumerStatus() + mqOffsetMap := pc.storage.getMQOffsetMap(topic) + if mqOffsetMap != nil { + consumerStatus.MQOffsetMap = mqOffsetMap + } + return consumerStatus +} + func (pc *defaultPullConsumer) validate() error { if err := internal.ValidateGroup(pc.consumerGroup); err != nil { return err diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index d8706b0..238ab03 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -430,6 +430,15 @@ func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, broker return res } +func (pc *pushConsumer) GetConsumerStatus(topic string) *internal.ConsumerStatus { + consumerStatus := internal.NewConsumerStatus() + mqOffsetMap := pc.storage.getMQOffsetMap(topic) + if mqOffsetMap != nil { + consumerStatus.MQOffsetMap = mqOffsetMap + } + return consumerStatus +} + func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo { info := internal.NewConsumerRunningInfo() diff --git a/internal/client.go b/internal/client.go index 486dffd..d479b0c 100644 --- a/internal/client.go +++ b/internal/client.go @@ -92,6 +92,7 @@ type InnerConsumer interface { GetModel() string GetWhere() string ResetOffset(topic string, table map[primitive.MessageQueue]int64) + GetConsumerStatus(topic string) *ConsumerStatus } func DefaultClientOptions() ClientOptions { @@ -377,6 +378,32 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R res.Code = ResSuccess return res }) + + client.remoteClient.RegisterRequestFunc(ReqGetConsumerStatsFromClient, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand { + rlog.Info("receive get consumer status from client request...", map[string]interface{}{ + rlog.LogKeyBroker: addr.String(), + rlog.LogKeyTopic: req.ExtFields["topic"], + rlog.LogKeyConsumerGroup: req.ExtFields["group"], + }) + + header := new(GetConsumerStatusRequestHeader) + header.Decode(req.ExtFields) + res := remote.NewRemotingCommand(ResError, nil, nil) + + consumerStatus := client.getConsumerStatus(header.topic, header.group) + if consumerStatus != nil { + res.Code = ResSuccess + data, err := consumerStatus.Encode() + if err != nil { + res.Remark = fmt.Sprintf("Failed to encode consumer status: %s", err.Error()) + } else { + res.Body = data + } + } else { + res.Remark = "there is unexpected error when get consumer status, please check log" + } + return res + }) } return client } @@ -905,6 +932,15 @@ func (c *rmqClient) resetOffset(topic string, group string, offsetTable map[prim consumer.(InnerConsumer).ResetOffset(topic, offsetTable) } +func (c *rmqClient) getConsumerStatus(topic string, group string) *ConsumerStatus { + consumer, exist := c.consumerMap.Load(group) + if !exist { + rlog.Warning("group "+group+" do not exists", nil) + return nil + } + return consumer.(InnerConsumer).GetConsumerStatus(topic) +} + func (c *rmqClient) getConsumerRunningInfo(group string, stack bool) *ConsumerRunningInfo { consumer, exist := c.consumerMap.Load(group) if !exist { diff --git a/internal/model.go b/internal/model.go index f058925..36e06a1 100644 --- a/internal/model.go +++ b/internal/model.go @@ -295,6 +295,54 @@ func NewConsumerRunningInfo() *ConsumerRunningInfo { } } +type ConsumerStatus struct { + MQOffsetMap map[primitive.MessageQueue]int64 +} + +func (status ConsumerStatus) Encode() ([]byte, error) { + mapJson := "" + keys := make([]primitive.MessageQueue, 0) + + for k := range status.MQOffsetMap { + keys = append(keys, k) + } + + sort.Slice(keys, func(i, j int) bool { + q1 := keys[i] + q2 := keys[j] + com := strings.Compare(q1.Topic, q2.Topic) + if com != 0 { + return com < 0 + } + + com = strings.Compare(q1.BrokerName, q2.BrokerName) + if com != 0 { + return com < 0 + } + + return q1.QueueId < q2.QueueId + }) + + for idx := range keys { + dataK, err := json.Marshal(keys[idx]) + if err != nil { + return nil, err + } + dataV, err := json.Marshal(status.MQOffsetMap[keys[idx]]) + mapJson = fmt.Sprintf("%s,%s:%s", mapJson, string(dataK), string(dataV)) + } + mapJson = strings.TrimLeft(mapJson, ",") + jsonData := fmt.Sprintf("{\"%s\":%s}", + "messageQueueTable", fmt.Sprintf("{%s}", mapJson)) + return []byte(jsonData), nil +} + +func NewConsumerStatus() *ConsumerStatus { + return &ConsumerStatus{ + MQOffsetMap: make(map[primitive.MessageQueue]int64), + } +} + type ConsumeMessageDirectlyResult struct { Order bool `json:"order"` AutoCommit bool `json:"autoCommit"` diff --git a/internal/request.go b/internal/request.go index 9a590b8..7e86b50 100644 --- a/internal/request.go +++ b/internal/request.go @@ -50,6 +50,7 @@ const ( ReqDeleteTopicInBroker = int16(215) ReqDeleteTopicInNameSrv = int16(216) ReqResetConsumerOffset = int16(220) + ReqGetConsumerStatsFromClient = int16(221) ReqGetConsumerRunningInfo = int16(307) ReqConsumeMessageDirectly = int16(309) ReqSendReplyMessage = int16(324) @@ -490,6 +491,38 @@ func (request *ConsumeMessageDirectlyHeader) Decode(properties map[string]string } } +type GetConsumerStatusRequestHeader struct { + topic string + group string + clientAddr string +} + +func (request *GetConsumerStatusRequestHeader) Encode() map[string]string { + return map[string]string{ + "topic": request.topic, + "group": request.group, + "clientAddr": request.clientAddr, + } +} + +func (request *GetConsumerStatusRequestHeader) Decode(properties map[string]string) { + if len(properties) == 0 { + return + } + + if v, existed := properties["topic"]; existed { + request.topic = v + } + + if v, existed := properties["group"]; existed { + request.group = v + } + + if v, existed := properties["clientAddr"]; existed { + request.clientAddr = v + } +} + type ReplyMessageRequestHeader struct { producerGroup string topic string