This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch pull-consumer
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/pull-consumer by this push:
     new 29d0691  [ISSUE #737] Add ManualPullConsumer (#745)
29d0691 is described below

commit 29d06911f8d5a3aa14a0950776122cfd6fb28d48
Author: Gagharv <[email protected]>
AuthorDate: Mon Dec 27 14:07:56 2021 +0800

    [ISSUE #737] Add ManualPullConsumer (#745)
    
    * add manual consumer
    
    * Make  the namesrv in `NewManualPullConsumer` can reuse the definition of 
the upper
    
    Change-Id: I4f2811988bc639554b3029a7fdec76509f3a8907
    
    * add comments and adjust the code
    
    Change-Id: Ie4bcd8cba6a07a3bff3dfb328ce38c157cbfca76
    
    * adjust code
    
    Change-Id: Ie03834b685882f0f40171192719124a4c1ea44ed
    
    * adjust log output, etc.
    
    Change-Id: I9cfda8364c1163f700c2f3e2ef673fd3b117f270
---
 consumer/manual_pull_consumer.go      | 342 +++++++++++++++++++++++++++++++++
 consumer/manual_pull_consumer_test.go | 343 ++++++++++++++++++++++++++++++++++
 examples/consumer/manual/main.go      |  83 ++++++++
 internal/request.go                   |  12 ++
 primitive/interceptor.go              |   4 +
 5 files changed, 784 insertions(+)

diff --git a/consumer/manual_pull_consumer.go b/consumer/manual_pull_consumer.go
new file mode 100644
index 0000000..0abff56
--- /dev/null
+++ b/consumer/manual_pull_consumer.go
@@ -0,0 +1,342 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+       "context"
+       "fmt"
+       "strconv"
+       "sync"
+       "time"
+
+       errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+       "github.com/apache/rocketmq-client-go/v2/internal"
+       "github.com/apache/rocketmq-client-go/v2/internal/remote"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+       "github.com/pkg/errors"
+)
+
+// ManualPullConsumer is a low-level consumer, which operates based on 
MessageQueue.
+// Users should maintain information such as offset by themselves
+type ManualPullConsumer interface {
+       // PullFromQueue return messages according to specified queue with 
offset
+       PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset 
int64, numbers int) (*primitive.PullResult, error)
+
+       // GetMessageQueues return queues of the topic
+       GetMessageQueues(ctx context.Context, topic string) 
([]*primitive.MessageQueue, error)
+
+       // CommittedOffset return the offset of mq in groupName, if mq not 
exist, -1 will be return
+       CommittedOffset(ctx context.Context, groupName string, mq 
*primitive.MessageQueue) (int64, error)
+
+       // Seek let consume position to the offset, this api can be used to 
reset offset and commit offset
+       Seek(ctx context.Context, groupName string, mq *primitive.MessageQueue, 
offset int64) error
+
+       // Lookup return offset according to timestamp(ms), the maximum offset 
that born time less than timestamp will be return.
+       // If timestamp less than any message's born time, the earliest offset 
will be returned
+       // If timestamp great than any message's born time, the latest offset 
will be returned
+       Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp 
int64) (int64, error)
+
+       // Shutdown the ManualPullConsumer, clean up internal resources
+       Shutdown() error
+}
+
+type defaultManualPullConsumer struct {
+       namesrv                internal.Namesrvs
+       option                 consumerOptions
+       client                 internal.RMQClient
+       interceptor            primitive.Interceptor
+       pullFromWhichNodeTable sync.Map
+       shutdownOnce           sync.Once
+}
+
+// NewManualPullConsumer creates and initializes a new ManualPullConsumer.
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, 
error) {
+       defaultOpts := defaultPullConsumerOptions()
+       for _, apply := range options {
+               apply(&defaultOpts)
+       }
+
+       srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+       if err != nil {
+               return nil, errors.Wrap(err, "new Namesrv failed.")
+       }
+       if !defaultOpts.Credentials.IsEmpty() {
+               srvs.SetCredentials(defaultOpts.Credentials)
+       }
+       defaultOpts.Namesrv = srvs
+
+       actualRMQClient := 
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+       actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), 
defaultOpts.Namesrv)
+
+       dc := &defaultManualPullConsumer{
+               client:  actualRMQClient,
+               option:  defaultOpts,
+               namesrv: actualNameSrv,
+       }
+
+       dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+       dc.option.ClientOptions.Namesrv = actualNameSrv
+       return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, 
groupName string, mq *primitive.MessageQueue, offset int64, numbers int) 
(*primitive.PullResult, error) {
+       if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+               return nil, err
+       }
+       subData := buildSubscriptionData(mq.Topic, MessageSelector{
+               Expression: _SubAll,
+       })
+
+       sysFlag := buildSysFlag(false, true, true, false)
+
+       pullRequest := &internal.PullMessageRequestHeader{
+               ConsumerGroup:        groupName,
+               Topic:                mq.Topic,
+               QueueId:              int32(mq.QueueId),
+               QueueOffset:          offset,
+               MaxMsgNums:           int32(numbers),
+               SysFlag:              sysFlag,
+               CommitOffset:         0,
+               SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+               SubExpression:        subData.SubString,
+               ExpressionType:       string(subData.ExpType),
+       }
+
+       if subData.ExpType == string(TAG) {
+               pullRequest.SubVersion = 0
+       } else {
+               pullRequest.SubVersion = subData.SubVersion
+       }
+
+       pullResp, err := dc.pullInner(ctx, mq, pullRequest)
+       if err != nil {
+               return pullResp, err
+       }
+       dc.processPullResult(mq, pullResp, subData)
+       if dc.interceptor != nil {
+               msgCtx := &primitive.ConsumeMessageContext{
+                       Properties:    make(map[string]string),
+                       ConsumerGroup: groupName,
+                       MQ:            mq,
+                       Msgs:          pullResp.GetMessageExts(),
+               }
+               err = dc.interceptor(ctx, msgCtx, struct{}{}, 
primitive.NoopInterceptor)
+       }
+       return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, 
topic string) ([]*primitive.MessageQueue, error) {
+       return dc.namesrv.FetchSubscribeMessageQueues(topic)
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(ctx context.Context, 
groupName string, mq *primitive.MessageQueue) (int64, error) {
+       fn := func(broker string) (*remote.RemotingCommand, error) {
+               request := &internal.QueryConsumerOffsetRequestHeader{
+                       ConsumerGroup: groupName,
+                       Topic:         mq.Topic,
+                       QueueId:       mq.QueueId,
+               }
+               cmd := 
remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, request, nil)
+               return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+       }
+       return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) Seek(ctx context.Context, groupName 
string, mq *primitive.MessageQueue, offset int64) error {
+       minOffset, err := dc.queryMinOffset(context.Background(), mq)
+       if err != nil {
+               return err
+       }
+       maxOffset, err := dc.queryMaxOffset(context.Background(), mq)
+       if err != nil {
+               return err
+       }
+       if offset < minOffset || offset > maxOffset {
+               return fmt.Errorf("Seek offset illegal, seek offset = %d, min 
offset = %d, max offset = %d", offset, minOffset, maxOffset)
+       }
+
+       broker, exist := dc.chooseServer(mq)
+       if !exist {
+               rlog.Warning("the broker does not exist", 
map[string]interface{}{
+                       rlog.LogKeyBroker: mq.BrokerName,
+               })
+               return errors2.ErrBrokerNotFound
+       }
+
+       updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
+               ConsumerGroup: groupName,
+               Topic:         mq.Topic,
+               QueueId:       mq.QueueId,
+               CommitOffset:  offset,
+       }
+       cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, 
updateOffsetRequest, nil)
+       return dc.client.InvokeOneWay(context.Background(), broker, cmd, 
5*time.Second)
+}
+
+func (dc *defaultManualPullConsumer) Lookup(ctx context.Context, mq 
*primitive.MessageQueue, timestamp int64) (int64, error) {
+       fn := func(broker string) (*remote.RemotingCommand, error) {
+               request := &internal.SearchOffsetRequestHeader{
+                       Topic:     mq.Topic,
+                       QueueId:   mq.QueueId,
+                       Timestamp: timestamp,
+               }
+               cmd := 
remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
+               return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+       }
+       return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) Shutdown() error {
+       dc.shutdownOnce.Do(func() {
+               dc.client.Shutdown()
+       })
+       return nil
+}
+
+func (dc *defaultManualPullConsumer) chooseServer(mq *primitive.MessageQueue) 
(string, bool) {
+       brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+       if brokerAddr == "" {
+               dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+               brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+       }
+       return brokerAddr, brokerAddr != ""
+}
+
+func (dc *defaultManualPullConsumer) queryMinOffset(ctx context.Context, mq 
*primitive.MessageQueue) (int64, error) {
+       fn := func(broker string) (*remote.RemotingCommand, error) {
+               request := &internal.GetMinOffsetRequestHeader{
+                       Topic:   mq.Topic,
+                       QueueId: mq.QueueId,
+               }
+               cmd := remote.NewRemotingCommand(internal.ReqGetMinOffset, 
request, nil)
+               return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+       }
+       return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) queryMaxOffset(ctx context.Context, mq 
*primitive.MessageQueue) (int64, error) {
+       fn := func(broker string) (*remote.RemotingCommand, error) {
+               request := &internal.GetMaxOffsetRequestHeader{
+                       Topic:   mq.Topic,
+                       QueueId: mq.QueueId,
+               }
+               cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, 
request, nil)
+               return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+       }
+       return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) processQueryOffset(mq 
*primitive.MessageQueue, fn func(broker string) (*remote.RemotingCommand, 
error)) (int64, error) {
+       broker, exist := dc.chooseServer(mq)
+       if !exist {
+               rlog.Warning("the broker does not exist", 
map[string]interface{}{
+                       rlog.LogKeyBroker: mq.BrokerName,
+               })
+               return -1, errors2.ErrBrokerNotFound
+       }
+       response, err := fn(broker)
+       if err != nil {
+               return -1, err
+       }
+       if response.Code != internal.ResSuccess {
+               return -2, fmt.Errorf("broker response code: %d, remarks: %s", 
response.Code, response.Remark)
+       }
+       off, err := strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+       if err != nil {
+               return -1, errors.Wrap(err, "parse offset fail.")
+       }
+       return off, nil
+}
+
+func (dc *defaultManualPullConsumer) pullInner(ctx context.Context, mq 
*primitive.MessageQueue, pullRequest *internal.PullMessageRequestHeader) 
(*primitive.PullResult, error) {
+       brokerResult := dc.tryFindBroker(mq)
+       if brokerResult == nil {
+               rlog.Warning("no broker found for mq", map[string]interface{}{
+                       rlog.LogKeyMessageQueue: mq,
+               })
+               return nil, errors2.ErrBrokerNotFound
+       }
+
+       if (pullRequest.ExpressionType == string(TAG)) && 
brokerResult.BrokerVersion < internal.V4_1_0 {
+               return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to 
support for filter message by %v",
+                       mq.BrokerName, brokerResult.BrokerVersion, 
pullRequest.ExpressionType)
+       }
+       return dc.client.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+}
+
+func (dc *defaultManualPullConsumer) tryFindBroker(mq *primitive.MessageQueue) 
*internal.FindBrokerResult {
+       result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, 
dc.recalculatePullFromWhichNode(mq), false)
+       if result != nil {
+               return result
+       }
+       dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+       return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, 
dc.recalculatePullFromWhichNode(mq), false)
+}
+func (dc *defaultManualPullConsumer) recalculatePullFromWhichNode(mq 
*primitive.MessageQueue) int64 {
+       v, exist := dc.pullFromWhichNodeTable.Load(*mq)
+       if exist {
+               return v.(int64)
+       }
+       return internal.MasterId
+}
+
+func (dc *defaultManualPullConsumer) checkPull(ctx context.Context, mq 
*primitive.MessageQueue, offset int64, numbers int) error {
+       if mq == nil {
+               return errors2.ErrMQEmpty
+       }
+       if offset < 0 {
+               return errors2.ErrOffset
+       }
+       if numbers <= 0 {
+               return errors2.ErrNumbers
+       }
+       return nil
+}
+
+func (dc *defaultManualPullConsumer) processPullResult(mq 
*primitive.MessageQueue, result *primitive.PullResult, data 
*internal.SubscriptionData) {
+
+       dc.pullFromWhichNodeTable.Store(*mq, result.SuggestWhichBrokerId)
+
+       switch result.Status {
+       case primitive.PullFound:
+               result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+               msgs := result.GetMessageExts()
+               // filter message according to tags
+               msgListFilterAgain := msgs
+               if data.Tags.Len() > 0 && data.ClassFilterMode {
+                       msgListFilterAgain = make([]*primitive.MessageExt, 0)
+                       for _, msg := range msgs {
+                               _, exist := data.Tags.Contains(msg.GetTags())
+                               if exist {
+                                       msgListFilterAgain = 
append(msgListFilterAgain, msg)
+                               }
+                       }
+               }
+               // TODO: add filter message hook
+               for _, msg := range msgListFilterAgain {
+                       traFlag, _ := 
strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
+                       if traFlag {
+                               msg.TransactionId = 
msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
+                       }
+                       msg.WithProperty(primitive.PropertyMinOffset, 
strconv.FormatInt(result.MinOffset, 10))
+                       msg.WithProperty(primitive.PropertyMaxOffset, 
strconv.FormatInt(result.MaxOffset, 10))
+               }
+               result.SetMessageExts(msgListFilterAgain)
+       }
+}
diff --git a/consumer/manual_pull_consumer_test.go 
b/consumer/manual_pull_consumer_test.go
new file mode 100644
index 0000000..d04d522
--- /dev/null
+++ b/consumer/manual_pull_consumer_test.go
@@ -0,0 +1,343 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+       "context"
+       "strconv"
+       "testing"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/v2/internal"
+       "github.com/apache/rocketmq-client-go/v2/internal/remote"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/golang/mock/gomock"
+       . "github.com/smartystreets/goconvey/convey"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestPullFromQueue(t *testing.T) {
+       Convey("ManualPullConsumer PullFromQueue", t, func() {
+               serverTopic := "foo"
+               tests := map[string]struct {
+                       mq          primitive.MessageQueue
+                       offset      int64
+                       numbers     int
+                       expectedErr bool
+               }{
+                       "topic exist": {
+                               primitive.MessageQueue{
+                                       Topic:      "foo",
+                                       BrokerName: "",
+                                       QueueId:    0,
+                               },
+                               1,
+                               1,
+                               false,
+                       },
+                       "topic not exist": {
+                               primitive.MessageQueue{
+                                       Topic:      "foo2",
+                                       BrokerName: "",
+                                       QueueId:    0,
+                               },
+                               1,
+                               1,
+                               false,
+                       },
+               }
+
+               ctrl := gomock.NewController(t)
+               defer ctrl.Finish()
+
+               c, err := 
NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+               if err != nil {
+                       assert.Error(t, err)
+               }
+
+               namesrv, client := internal.NewMockNamesrvs(ctrl), 
internal.NewMockRMQClient(ctrl)
+               c.namesrv, c.client = namesrv, client
+               namesrv.EXPECT().FindBrokerAddressInSubscribe(gomock.Any(), 
gomock.Any(), gomock.Any()).Return(
+                       &internal.FindBrokerResult{
+                               BrokerAddr:    "foo",
+                               Slave:         false,
+                               BrokerVersion: 1.0,
+                       },
+               )
+               client.EXPECT().PullMessage(gomock.Any(), gomock.Any(), 
gomock.Any()).DoAndReturn(
+                       func(ctx context.Context, brokerAddrs string, request 
*internal.PullMessageRequestHeader) (*primitive.PullResult, error) {
+                               pullResult := &primitive.PullResult{
+                                       SuggestWhichBrokerId: 0,
+                               }
+                               if request.Topic == serverTopic {
+                                       pullResult.Status = primitive.PullFound
+                               } else {
+                                       pullResult.Status = 
primitive.PullNoNewMsg
+                               }
+                               return pullResult, nil
+                       })
+
+               for name, test := range tests {
+                       Convey(name, func() {
+                               _, err := c.PullFromQueue(context.Background(), 
"default", &test.mq, test.offset, test.numbers)
+                               So(err, ShouldBeNil)
+                       })
+               }
+
+       })
+}
+
+func TestGetMessageQueues(t *testing.T) {
+       Convey("ManualPullConsumer GetMessageQueues", t, func() {
+               ctrl := gomock.NewController(t)
+               defer ctrl.Finish()
+
+               c, err := 
NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+               if err != nil {
+                       assert.Error(t, err)
+               }
+
+               namesrv := internal.NewMockNamesrvs(ctrl)
+               c.namesrv = namesrv
+               namesrv.EXPECT().FetchSubscribeMessageQueues("foo").Return(
+                       []*primitive.MessageQueue{
+                               {Topic: "foo", BrokerName: "foo", QueueId: 0},
+                       }, nil,
+               )
+
+               queues, err := c.GetMessageQueues(context.TODO(), "foo")
+
+               So(queues, ShouldNotBeEmpty)
+               So(err, ShouldBeNil)
+       })
+}
+
+func TestCommittedOffset(t *testing.T) {
+       Convey("ManualPullConsumer CommittedOffset", t, func() {
+
+               serverTopic, serverOffset := "foo", "1"
+               tests := map[string]struct {
+                       mq          primitive.MessageQueue
+                       except      int
+                       expectedErr bool
+               }{
+                       "topic exist": {
+                               primitive.MessageQueue{
+                                       Topic:      "foo",
+                                       BrokerName: "",
+                                       QueueId:    0,
+                               },
+                               1,
+                               false,
+                       },
+                       "topic not exist": {
+                               primitive.MessageQueue{
+                                       Topic:      "foo2",
+                                       BrokerName: "",
+                                       QueueId:    0,
+                               },
+                               -2,
+                               true,
+                       },
+               }
+
+               ctrl := gomock.NewController(t)
+               defer ctrl.Finish()
+
+               c, err := 
NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+               if err != nil {
+                       assert.Error(t, err)
+               }
+               namesrv, client := internal.NewMockNamesrvs(ctrl), 
internal.NewMockRMQClient(ctrl)
+               c.namesrv, c.client = namesrv, client
+
+               
namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("foo").AnyTimes()
+               client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), 
gomock.Any(), gomock.Any()).DoAndReturn(
+                       func(ctx context.Context, addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, 
error) {
+                               if request.ExtFields["topic"] == serverTopic {
+                                       ret := &remote.RemotingCommand{
+                                               Code: internal.ResSuccess,
+                                               ExtFields: map[string]string{
+                                                       "offset": serverOffset,
+                                               },
+                                       }
+                                       return ret, nil
+                               }
+                               ret := &remote.RemotingCommand{
+                                       Code: internal.ResTopicNotExist,
+                               }
+                               return ret, nil
+                       }).AnyTimes()
+
+               for name, test := range tests {
+                       Convey(name, func() {
+                               ret, err := 
c.CommittedOffset(context.Background(), "foo", &test.mq)
+
+                               if test.expectedErr {
+                                       So(err, ShouldNotBeNil)
+                               } else {
+                                       So(err, ShouldBeNil)
+                               }
+                               So(ret, ShouldEqual, test.except)
+                       })
+               }
+       })
+}
+
+func TestSeek(t *testing.T) {
+       Convey("ManualPullConsumer Seek", t, func() {
+
+               serverMinOffset, serverMaxOffset := "1", "10"
+               tests := map[string]struct {
+                       mq          primitive.MessageQueue
+                       offset      int64
+                       expectedErr bool
+               }{
+                       "normal offset": {
+                               primitive.MessageQueue{
+                                       Topic:      "foo",
+                                       BrokerName: "foo",
+                                       QueueId:    0,
+                               },
+                               3,
+                               false,
+                       },
+                       "illegal offset": {
+                               primitive.MessageQueue{
+                                       Topic:      "foo",
+                                       BrokerName: "foo",
+                                       QueueId:    0,
+                               },
+                               11,
+                               true,
+                       },
+               }
+
+               ctrl := gomock.NewController(t)
+               defer ctrl.Finish()
+
+               c, err := 
NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+               if err != nil {
+                       assert.Error(t, err)
+               }
+               namesrv, client := internal.NewMockNamesrvs(ctrl), 
internal.NewMockRMQClient(ctrl)
+               c.namesrv, c.client = namesrv, client
+
+               
namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("foo").AnyTimes()
+
+               client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), 
gomock.Any(), gomock.Any()).DoAndReturn(
+                       func(ctx context.Context, addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, 
error) {
+                               if request.Code == internal.ReqGetMinOffset {
+                                       return &remote.RemotingCommand{
+                                               Code: internal.ResSuccess,
+                                               ExtFields: map[string]string{
+                                                       "offset": 
serverMinOffset,
+                                               },
+                                       }, nil
+                               } else if request.Code == 
internal.ReqGetMaxOffset {
+                                       return &remote.RemotingCommand{
+                                               Code: internal.ResSuccess,
+                                               ExtFields: map[string]string{
+                                                       "offset": 
serverMaxOffset,
+                                               },
+                                       }, nil
+                               }
+                               return &remote.RemotingCommand{
+                                       Code: internal.ResError,
+                               }, nil
+                       }).AnyTimes()
+
+               client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), 
gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
+               for name, test := range tests {
+                       Convey(name, func() {
+                               err := c.Seek(context.Background(), "foo", 
&test.mq, test.offset)
+                               if test.expectedErr {
+                                       So(err, ShouldNotBeNil)
+                               } else {
+                                       So(err, ShouldBeNil)
+                               }
+                       })
+               }
+       })
+}
+
+func TestLookup(t *testing.T) {
+       Convey("ManualPullConsumer Lookup", t, func() {
+               test := struct {
+                       mq          primitive.MessageQueue
+                       timestamp   int64
+                       offset      int64
+                       expectedErr bool
+               }{
+                       primitive.MessageQueue{
+                               Topic:      "foo",
+                               BrokerName: "foo",
+                               QueueId:    0,
+                       },
+                       int64(time.Now().Nanosecond()),
+                       10,
+                       true,
+               }
+
+               ctrl := gomock.NewController(t)
+               defer ctrl.Finish()
+
+               c, err := 
NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+               if err != nil {
+                       assert.Error(t, err)
+               }
+               namesrv, client := internal.NewMockNamesrvs(ctrl), 
internal.NewMockRMQClient(ctrl)
+               c.namesrv, c.client = namesrv, client
+
+               
namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("foo").AnyTimes()
+
+               client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), 
gomock.Any(), gomock.Any()).DoAndReturn(
+                       func(ctx context.Context, addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, 
error) {
+
+                               return &remote.RemotingCommand{
+                                       Code: internal.ResSuccess,
+                                       ExtFields: map[string]string{
+                                               "offset": 
strconv.FormatInt(test.offset, 10),
+                                       },
+                               }, nil
+                       }).AnyTimes()
+
+               ret, err := c.Lookup(context.Background(), &test.mq, 
test.timestamp)
+               So(err, ShouldBeNil)
+               So(ret, ShouldEqual, test.offset)
+       })
+}
+
+func TestShutdown(t *testing.T) {
+       Convey("ManualPullConsumer Shutdown", t, func() {
+
+               ctrl := gomock.NewController(t)
+               defer ctrl.Finish()
+
+               c, err := 
NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+               if err != nil {
+                       assert.Error(t, err)
+               }
+               client := internal.NewMockRMQClient(ctrl)
+               c.client = client
+
+               client.EXPECT().Shutdown().Times(1)
+               c.Shutdown()
+               c.Shutdown()
+       })
+}
diff --git a/examples/consumer/manual/main.go b/examples/consumer/manual/main.go
new file mode 100644
index 0000000..1bdf479
--- /dev/null
+++ b/examples/consumer/manual/main.go
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "sync"
+
+       "github.com/apache/rocketmq-client-go/v2/consumer"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+func main() {
+       groupName := "testGroup"
+       c, err := consumer.NewManualPullConsumer(
+               consumer.WithGroupName(groupName),
+               
consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+       )
+       if err != nil {
+               rlog.Fatal(fmt.Sprintf("init producer error: %v", err), nil)
+       }
+
+       topic := "test"
+       // get all message queue
+       mqs, err := c.GetMessageQueues(context.Background(), topic)
+       if err != nil {
+               rlog.Fatal(fmt.Sprintf("get message queue error: %v", err), nil)
+       }
+       var wg sync.WaitGroup
+
+       fn := func(mq *primitive.MessageQueue) {
+               defer wg.Done()
+               // get latest offset
+               offset, err := c.CommittedOffset(context.Background(), 
groupName, mq)
+               if err != nil {
+                       rlog.Fatal(fmt.Sprintf("search consumer offset error: 
%v", err), nil)
+               }
+               for {
+                       // pull message
+                       ret, err := c.PullFromQueue(context.Background(), 
groupName, mq, offset, 1)
+                       if err != nil {
+                               rlog.Fatal(fmt.Sprintf("pullFromQueue error: 
%v", err), nil)
+                       }
+                       if ret.Status == primitive.PullFound {
+                               msgs := ret.GetMessageExts()
+                               for _, msg := range msgs {
+                                       fmt.Printf("subscribe Msg: %v \n", msg)
+                                       // commit offset
+                                       if err = c.Seek(context.Background(), 
groupName, mq, msg.QueueOffset+1); err != nil {
+                                               rlog.Fatal(fmt.Sprintf("commit 
offset error: %v", err), nil)
+                                       }
+                                       offset++
+                               }
+                       } else {
+                               break
+                       }
+               }
+       }
+
+       for _, mq := range mqs {
+               wg.Add(1)
+               go fn(mq)
+       }
+       wg.Wait()
+       c.Shutdown()
+}
diff --git a/internal/request.go b/internal/request.go
index 0e3d8e1..f53d5ff 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -236,6 +236,18 @@ func (request *GetConsumerListRequestHeader) Encode() 
map[string]string {
        return maps
 }
 
+type GetMinOffsetRequestHeader struct {
+       Topic   string
+       QueueId int
+}
+
+func (request *GetMinOffsetRequestHeader) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["topic"] = request.Topic
+       maps["queueId"] = strconv.Itoa(request.QueueId)
+       return maps
+}
+
 type GetMaxOffsetRequestHeader struct {
        Topic   string
        QueueId int
diff --git a/primitive/interceptor.go b/primitive/interceptor.go
index 878aab5..51716aa 100644
--- a/primitive/interceptor.go
+++ b/primitive/interceptor.go
@@ -29,6 +29,10 @@ type Invoker func(ctx context.Context, req, reply 
interface{}) error
 // use type assert to get real type.
 type Interceptor func(ctx context.Context, req, reply interface{}, next 
Invoker) error
 
+var NoopInterceptor = func(ctx context.Context, req, reply interface{}) error {
+       return nil
+}
+
 func ChainInterceptors(interceptors ...Interceptor) Interceptor {
        if len(interceptors) == 0 {
                return nil

Reply via email to