This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ae76839  feat: support lite pull consumer (#881)
ae76839 is described below

commit ae76839caba1c718c5b6569682dbc4a931f3f40d
Author: Nick <[email protected]>
AuthorDate: Sat Aug 13 14:05:06 2022 +0800

    feat: support lite pull consumer (#881)
    
    * feat: support lite pull consumer
    
    * remove the real ip.
    
    * go.mod require go 1.13,so remove old versions.
    
    * 1.no need to persist offset,rocketmq client do it when start;2.add pprof.
    
    * Update .travis.yml
    
    Co-authored-by: dinglei <[email protected]>
---
 .travis.yml                              |   4 +-
 admin/admin.go                           |  21 +-
 api.go                                   |  79 +---
 consumer/consumer.go                     |  77 ++--
 consumer/option.go                       |   2 +
 consumer/pull_consumer.go                | 738 ++++++++++++++++++++++++++-----
 consumer/strategy.go                     |   2 +-
 examples/consumer/pull/main.go           |  72 ---
 examples/consumer/pull/poll/main.go      | 102 +++++
 examples/consumer/pull/pull/main.go      | 143 ++++++
 examples/consumer/pull/pull_from/main.go | 177 ++++++++
 go.mod                                   |   7 +-
 go.sum                                   | 106 ++++-
 internal/utils/namespace.go              |  24 +
 internal/utils/string.go                 |   6 +
 primitive/result.go                      |   4 +-
 rlog/log.go                              |  21 +-
 17 files changed, 1293 insertions(+), 292 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index e267cee..a3ea9e8 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,9 +1,9 @@
 language: go
 
 go:
-  - "1.11.x"
-  - "1.12.x"
   - "1.13.x"
+  - "1.16.x"
+  - "1.18.x"
 go_import_path: github.com/apache/rocketmq-client-go/v2
 env:
   global:
diff --git a/admin/admin.go b/admin/admin.go
index 8169dcb..20e1794 100644
--- a/admin/admin.go
+++ b/admin/admin.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/rocketmq-client-go/v2/internal"
        "github.com/apache/rocketmq-client-go/v2/internal/remote"
+       "github.com/apache/rocketmq-client-go/v2/internal/utils"
        "github.com/apache/rocketmq-client-go/v2/primitive"
        "github.com/apache/rocketmq-client-go/v2/rlog"
 )
@@ -35,6 +36,7 @@ type Admin interface {
        //TODO
        //TopicList(ctx context.Context, mq *primitive.MessageQueue) 
(*remote.RemotingCommand, error)
        //GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, 
error)
+       FetchPublishMessageQueues(ctx context.Context, topic string) 
([]*primitive.MessageQueue, error)
        Close() error
 }
 
@@ -61,6 +63,19 @@ func WithResolver(resolver primitive.NsResolver) AdminOption 
{
        }
 }
 
+func WithCredentials(c primitive.Credentials) AdminOption {
+       return func(options *adminOptions) {
+               options.ClientOptions.Credentials = c
+       }
+}
+
+// WithNamespace set the namespace of admin
+func WithNamespace(namespace string) AdminOption {
+       return func(options *adminOptions) {
+               options.ClientOptions.Namespace = namespace
+       }
+}
+
 type admin struct {
        cli internal.RMQClient
 
@@ -70,7 +85,7 @@ type admin struct {
 }
 
 // NewAdmin initialize admin
-func NewAdmin(opts ...AdminOption) (Admin, error) {
+func NewAdmin(opts ...AdminOption) (*admin, error) {
        defaultOpts := defaultAdminOptions()
        for _, opt := range opts {
                opt(defaultOpts)
@@ -202,6 +217,10 @@ func (a *admin) DeleteTopic(ctx context.Context, opts 
...OptionDelete) error {
        return nil
 }
 
+func (a *admin) FetchPublishMessageQueues(ctx context.Context, topic string) 
([]*primitive.MessageQueue, error) {
+       return 
a.cli.GetNameSrv().FetchPublishMessageQueues(utils.WrapNamespace(a.opts.Namespace,
 topic))
+}
+
 func (a *admin) Close() error {
        a.closeOnce.Do(func() {
                a.cli.Shutdown()
diff --git a/api.go b/api.go
index 6a39c28..449f754 100644
--- a/api.go
+++ b/api.go
@@ -22,7 +22,6 @@ import (
        "time"
 
        "github.com/apache/rocketmq-client-go/v2/consumer"
-       "github.com/apache/rocketmq-client-go/v2/errors"
        "github.com/apache/rocketmq-client-go/v2/internal"
        "github.com/apache/rocketmq-client-go/v2/primitive"
        "github.com/apache/rocketmq-client-go/v2/producer"
@@ -81,67 +80,37 @@ type PullConsumer interface {
        // Start the PullConsumer for consuming message
        Start() error
 
-       // Shutdown the PullConsumer, all offset of MessageQueue will be commit 
to broker before process exit
-       Shutdown() error
-
        // Subscribe a topic for consuming
        Subscribe(topic string, selector consumer.MessageSelector) error
 
        // Unsubscribe a topic
        Unsubscribe(topic string) error
 
-       // MessageQueues get MessageQueue list about for a given topic. This 
method will issue a remote call to the server
-       // if it does not already have any MessageQueue about the given topic.
-       MessageQueues(topic string) []primitive.MessageQueue
-
-       // Pull message for the topic specified. It is an error to not have 
subscribed to any topics before pull for message
-       //
-       // Specified numbers of messages is returned if message greater that 
numbers, and the offset will auto forward.
-       // It means that if you meeting messages consuming failed, you should 
process failed messages by yourself.
-       Pull(ctx context.Context, topic string, numbers int) 
(*primitive.PullResult, error)
-
-       // Pull message for the topic specified from a specified MessageQueue 
and offset. It is an error to not have
-       // subscribed to any topics before pull for message. the method will 
not affect the offset recorded
-       //
-       // Specified numbers of messages is returned.
-       PullFrom(ctx context.Context, mq primitive.MessageQueue, offset int64, 
numbers int) (*primitive.PullResult, error)
-
-       // Lookup offset for the given message queue by timestamp. The returned 
offset for the message queue is the
-       // earliest offset whose timestamp is greater than or equal to the 
given timestamp in the corresponding message
-       // queue.
-       //
-       // Timestamp must be millisecond level, if you want to lookup the 
earliest offset of the mq, you could set the
-       // timestamp 0, and if you want to the latest offset the mq, you could 
set the timestamp math.MaxInt64.
-       Lookup(ctx context.Context, mq primitive.MessageQueue, timestamp int64) 
(int64, error)
-
-       // Commit the offset of specified mqs to broker, if auto-commit is 
disable, you must commit the offset manually.
-       Commit(ctx context.Context, mqs ...primitive.MessageQueue) (int64, 
error)
-
-       // CommittedOffset return the offset of specified Message
-       CommittedOffset(mq primitive.MessageQueue) (int64, error)
-
-       // Seek set offset of the mq, if you wanna re-consuming your message 
form one position, the method may help you.
-       // if you want re-consuming from one time, you cloud Lookup() then seek 
it.
-       Seek(mq primitive.MessageQueue, offset int64) error
-
-       // Pause consuming for specified MessageQueues, after pause, client 
will not fetch any message from the specified
-       // message queues
-       //
-       // Note that this method does not affect message queue subscription. In 
particular, it does not cause a group
-       // rebalance.
-       //
-       // if a MessageQueue belong a topic that has not been subscribed, an 
error will be returned
-       //Pause(mqs ...primitive.MessageQueue) error
-
-       // Resume specified message queues which have been paused with Pause, 
if a MessageQueue that not paused,
-       // it will be ignored. if not subscribed, an error will be returned
-       //Resume(mqs ...primitive.MessageQueue) error
+       // Shutdown the PullConsumer, all offset of MessageQueue will be commit 
to broker before process exit
+       Shutdown() error
+
+       // Poll messages with timeout.
+       Poll(ctx context.Context, timeout time.Duration) 
(*consumer.ConsumeRequest, error)
+
+       //ACK ACK
+       ACK(ctx context.Context, cr *consumer.ConsumeRequest, consumeResult 
consumer.ConsumeResult)
+
+       // Pull message of topic,  selector indicate which queue to pull.
+       Pull(ctx context.Context, numbers int) (*primitive.PullResult, error)
+
+       // PullFrom pull messages of queue from the offset to offset + numbers
+       PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset 
int64, numbers int) (*primitive.PullResult, error)
+
+       // UpdateOffset updateOffset update offset of queue in mem
+       UpdateOffset(queue *primitive.MessageQueue, offset int64) error
+
+       // PersistOffset persist all offset in mem.
+       PersistOffset(ctx context.Context, topic string) error
+
+       // CurrentOffset return the current offset of queue in mem.
+       CurrentOffset(queue *primitive.MessageQueue) (int64, error)
 }
 
-// The PullConsumer has not implemented completely, if you want have an 
experience of PullConsumer, you could use
-// consumer.NewPullConsumer(...), but it may changed in the future.
-//
-// The PullConsumer will be supported in next release
 func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
-       return nil, errors.ErrPullConsumer
+       return consumer.NewPullConsumer(opts...)
 }
diff --git a/consumer/consumer.go b/consumer/consumer.go
index b605659..0d13764 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -269,6 +269,7 @@ type defaultConsumer struct {
 }
 
 func (dc *defaultConsumer) start() error {
+       dc.consumerGroup = utils.WrapNamespace(dc.option.Namespace, 
dc.consumerGroup)
        if dc.model == Clustering {
                // set retry topic
                retryTopic := internal.GetRetryTopic(dc.consumerGroup)
@@ -694,53 +695,51 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic 
string, mqs []*primitiv
                return true
        })
 
-       if dc.cType == _PushConsume {
-               for item := range mqSet {
-                       // BUG: the mq will send to channel, if not copy once, 
the next iter will modify the mq in the channel.
-                       mq := item
+       for item := range mqSet {
+               // BUG: the mq will send to channel, if not copy once, the next 
iter will modify the mq in the channel.
+               mq := item
+               _, exist := dc.processQueueTable.Load(mq)
+               if exist {
+                       continue
+               }
+               if dc.consumeOrderly && !dc.lock(&mq) {
+                       rlog.Warning("do defaultConsumer, add a new mq failed, 
because lock failed", map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: dc.consumerGroup,
+                               rlog.LogKeyMessageQueue:  mq.String(),
+                       })
+                       continue
+               }
+               dc.storage.remove(&mq)
+               nextOffset, err := dc.computePullFromWhereWithException(&mq)
+
+               if nextOffset >= 0 && err == nil {
                        _, exist := dc.processQueueTable.Load(mq)
                        if exist {
-                               continue
-                       }
-                       if dc.consumeOrderly && !dc.lock(&mq) {
-                               rlog.Warning("do defaultConsumer, add a new mq 
failed, because lock failed", map[string]interface{}{
+                               rlog.Debug("updateProcessQueueTable do 
defaultConsumer, mq already exist", map[string]interface{}{
                                        rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
                                        rlog.LogKeyMessageQueue:  mq.String(),
                                })
-                               continue
-                       }
-                       dc.storage.remove(&mq)
-                       nextOffset, err := 
dc.computePullFromWhereWithException(&mq)
-
-                       if nextOffset >= 0 && err == nil {
-                               _, exist := dc.processQueueTable.Load(mq)
-                               if exist {
-                                       rlog.Debug("do defaultConsumer, mq 
already exist", map[string]interface{}{
-                                               rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
-                                               rlog.LogKeyMessageQueue:  
mq.String(),
-                                       })
-                               } else {
-                                       rlog.Debug("do defaultConsumer, add a 
new mq", map[string]interface{}{
-                                               rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
-                                               rlog.LogKeyMessageQueue:  
mq.String(),
-                                       })
-                                       pq := newProcessQueue(dc.consumeOrderly)
-                                       dc.processQueueTable.Store(mq, pq)
-                                       pr := PullRequest{
-                                               consumerGroup: dc.consumerGroup,
-                                               mq:            &mq,
-                                               pq:            pq,
-                                               nextOffset:    nextOffset,
-                                       }
-                                       dc.prCh <- pr
-                                       changed = true
-                               }
                        } else {
-                               rlog.Warning("do defaultConsumer, add a new mq 
failed", map[string]interface{}{
+                               rlog.Debug("updateProcessQueueTable do 
defaultConsumer, add a new mq", map[string]interface{}{
                                        rlog.LogKeyConsumerGroup: 
dc.consumerGroup,
                                        rlog.LogKeyMessageQueue:  mq.String(),
                                })
+                               pq := newProcessQueue(dc.consumeOrderly)
+                               dc.processQueueTable.Store(mq, pq)
+                               pr := PullRequest{
+                                       consumerGroup: dc.consumerGroup,
+                                       mq:            &mq,
+                                       pq:            pq,
+                                       nextOffset:    nextOffset,
+                               }
+                               dc.prCh <- pr
+                               changed = true
                        }
+               } else {
+                       rlog.Warning("do defaultConsumer, add a new mq failed", 
map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: dc.consumerGroup,
+                               rlog.LogKeyMessageQueue:  mq.String(),
+                       })
                }
        }
 
@@ -760,9 +759,6 @@ func (dc *defaultConsumer) computePullFromWhere(mq 
*primitive.MessageQueue) int6
 }
 
 func (dc *defaultConsumer) computePullFromWhereWithException(mq 
*primitive.MessageQueue) (int64, error) {
-       if dc.cType == _PullConsume {
-               return 0, nil
-       }
        result := int64(-1)
        lastOffset, err := dc.storage.readWithException(mq, _ReadFromStore)
        if err != nil {
@@ -898,6 +894,7 @@ func (dc *defaultConsumer) processPullResult(mq 
*primitive.MessageQueue, result
 
                // TODO: add filter message hook
                for _, msg := range msgListFilterAgain {
+                       msg.Queue = mq
                        traFlag, _ := 
strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
                        if traFlag {
                                msg.TransactionId = 
msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
diff --git a/consumer/option.go b/consumer/option.go
index 3b035a3..ca6c8d5 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -134,6 +134,8 @@ func defaultPullConsumerOptions() consumerOptions {
        opts := consumerOptions{
                ClientOptions: internal.DefaultClientOptions(),
                Resolver:      primitive.NewHttpResolver("DEFAULT"),
+               ConsumerModel: Clustering,
+               Strategy:      AllocateByAveragely,
        }
        opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
        return opts
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 4699601..1ca4881 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -20,10 +20,17 @@ package consumer
 import (
        "context"
        "fmt"
+       "math"
+       "runtime/pprof"
+       "strconv"
+       "strings"
        "sync"
        "sync/atomic"
+       "time"
 
        errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+       "github.com/apache/rocketmq-client-go/v2/internal/remote"
+       "github.com/apache/rocketmq-client-go/v2/internal/utils"
 
        "github.com/pkg/errors"
 
@@ -32,43 +39,38 @@ import (
        "github.com/apache/rocketmq-client-go/v2/rlog"
 )
 
-type PullConsumer interface {
-       // Start
-       Start()
+// ErrNoNewMsg returns a "no new message found". Occurs only when no new 
message found from broker
+var ErrNoNewMsg = errors.New("no new message found")
 
-       // Shutdown refuse all new pull operation, finish all submitted.
-       Shutdown()
-
-       // Pull pull message of topic,  selector indicate which queue to pull.
-       Pull(ctx context.Context, topic string, selector MessageSelector, 
numbers int) (*primitive.PullResult, error)
-
-       // PullFrom pull messages of queue from the offset to offset + numbers
-       PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset 
int64, numbers int) (*primitive.PullResult, error)
-
-       // updateOffset update offset of queue in mem
-       UpdateOffset(queue *primitive.MessageQueue, offset int64) error
-
-       // PersistOffset persist all offset in mem.
-       PersistOffset(ctx context.Context) error
+func IsNoNewMsgError(err error) bool {
+       return err == ErrNoNewMsg
+}
 
-       // CurrentOffset return the current offset of queue in mem.
-       CurrentOffset(queue *primitive.MessageQueue) (int64, error)
+type ConsumeRequest struct {
+       messageQueue *primitive.MessageQueue
+       processQueue *processQueue
+       msgList      []*primitive.MessageExt
 }
 
-var (
-       queueCounterTable sync.Map
-)
+func (cr *ConsumeRequest) GetMsgList() []*primitive.MessageExt {
+       return cr.msgList
+}
 
 type defaultPullConsumer struct {
        *defaultConsumer
 
-       option    consumerOptions
-       client    internal.RMQClient
-       GroupName string
-       Model     MessageModel
-       UnitMode  bool
-
-       interceptor primitive.Interceptor
+       topic             string
+       selector          MessageSelector
+       GroupName         string
+       Model             MessageModel
+       UnitMode          bool
+       nextQueueSequence int64
+       allocateQueues    []*primitive.MessageQueue
+
+       done                chan struct{}
+       closeOnce           sync.Once
+       consumeRequestCache chan *ConsumeRequest
+       submitToConsume     func(*processQueue, *primitive.MessageQueue)
 }
 
 func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
@@ -85,12 +87,13 @@ func NewPullConsumer(options ...Option) 
(*defaultPullConsumer, error) {
        defaultOpts.Namesrv = srvs
        dc := &defaultConsumer{
                client:        
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
-               consumerGroup: defaultOpts.GroupName,
+               consumerGroup: utils.WrapNamespace(defaultOpts.Namespace, 
defaultOpts.GroupName),
                cType:         _PullConsume,
                state:         int32(internal.StateCreateJust),
                prCh:          make(chan PullRequest, 4),
                model:         defaultOpts.ConsumerModel,
                option:        defaultOpts,
+               allocate:      defaultOpts.Strategy,
        }
        if dc.client == nil {
                return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
@@ -98,80 +101,265 @@ func NewPullConsumer(options ...Option) 
(*defaultPullConsumer, error) {
        defaultOpts.Namesrv = dc.client.GetNameSrv()
 
        c := &defaultPullConsumer{
-               defaultConsumer: dc,
+               defaultConsumer:     dc,
+               done:                make(chan struct{}, 1),
+               consumeRequestCache: make(chan *ConsumeRequest, 4),
        }
+       dc.mqChanged = c.messageQueueChanged
+       c.submitToConsume = c.consumeMessageCurrently
        return c, nil
 }
 
-func (c *defaultPullConsumer) Start() error {
-       atomic.StoreInt32(&c.state, int32(internal.StateRunning))
+func (pc *defaultPullConsumer) Subscribe(topic string, selector 
MessageSelector) error {
+       if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
+               atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
+               return errors2.ErrStartTopic
+       }
+       topic = utils.WrapNamespace(pc.option.Namespace, topic)
+
+       data := buildSubscriptionData(topic, selector)
+       pc.subscriptionDataTable.Store(topic, data)
+       pc.topic = topic
+       pc.selector = selector
+
+       return nil
+}
+
+func (pc *defaultPullConsumer) Unsubscribe(topic string) error {
+       topic = utils.WrapNamespace(pc.option.Namespace, topic)
+       pc.subscriptionDataTable.Delete(topic)
+       return nil
+}
+
+func (pc *defaultPullConsumer) Start() error {
+       atomic.StoreInt32(&pc.state, int32(internal.StateRunning))
 
        var err error
-       c.once.Do(func() {
-               err = c.start()
+       pc.once.Do(func() {
+               consumerGroupWithNs := utils.WrapNamespace(pc.option.Namespace, 
pc.consumerGroup)
+               err = 
pc.defaultConsumer.client.RegisterConsumer(consumerGroupWithNs, pc)
                if err != nil {
+                       rlog.Error("defaultPullConsumer the consumer group has 
been created, specify another one", map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: pc.consumerGroup,
+                       })
+                       err = errors2.ErrCreated
                        return
                }
+               err = pc.start()
+               if err != nil {
+                       return
+               }
+
+               go func() {
+                       for {
+                               select {
+                               case pr := <-pc.prCh:
+                                       go func() {
+                                               pc.pullMessage(&pr)
+                                       }()
+                               case <-pc.done:
+                                       rlog.Info("defaultPullConsumer close 
PullRequest listener.", map[string]interface{}{
+                                               rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
+                                       })
+                                       return
+                               }
+                       }
+               }()
        })
 
+       pc.client.UpdateTopicRouteInfo()
+       _, exist := pc.topicSubscribeInfoTable.Load(pc.topic)
+       if !exist {
+               err = pc.Shutdown()
+               if err != nil {
+                       rlog.Error("defaultPullConsumer.Shutdown . route info 
not found, it may not exist", map[string]interface{}{
+                               rlog.LogKeyTopic:         pc.topic,
+                               rlog.LogKeyUnderlayError: err,
+                       })
+               }
+               return fmt.Errorf("the topic=%s route info not found, it may 
not exist", pc.topic)
+       }
+       pc.client.CheckClientInBroker()
+       pc.client.SendHeartbeatToAllBrokerWithLock()
+       pc.client.RebalanceImmediately()
+
        return err
 }
 
-func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector 
MessageSelector, numbers int) (*primitive.PullResult, error) {
-       mq := c.getNextQueueOf(topic)
-       if mq == nil {
-               return nil, fmt.Errorf("prepard to pull topic: %s, but no queue 
is founded", topic)
+func (pc *defaultPullConsumer) Poll(ctx context.Context, timeout 
time.Duration) (*ConsumeRequest, error) {
+       ctx, cancel := context.WithTimeout(ctx, timeout)
+       defer cancel()
+       select {
+       case <-ctx.Done():
+               return nil, ErrNoNewMsg
+       case cr := <-pc.consumeRequestCache:
+               if len(cr.GetMsgList()) == 0 {
+                       return nil, ErrNoNewMsg
+               }
+               return cr, nil
        }
+}
 
-       data := buildSubscriptionData(mq.Topic, selector)
-       result, err := c.pull(context.Background(), mq, data, 
c.nextOffsetOf(mq), numbers)
-
-       if err != nil {
-               return nil, err
+func (pc *defaultPullConsumer) ACK(ctx context.Context, cr *ConsumeRequest, 
result ConsumeResult) {
+       if cr == nil {
+               return
+       }
+       pq := cr.processQueue
+       mq := cr.messageQueue
+       msgList := cr.msgList
+       if len(msgList) == 0 || pq == nil || mq == nil {
+               return
+       }
+RETRY:
+       if pq.IsDroppd() {
+               rlog.Info("defaultPullConsumer the message queue not be able to 
consume, because it was dropped", map[string]interface{}{
+                       rlog.LogKeyMessageQueue:  mq.String(),
+                       rlog.LogKeyConsumerGroup: pc.consumerGroup,
+               })
+               return
        }
 
-       c.processPullResult(mq, result, data)
-       return result, nil
-}
+       pc.resetRetryAndNamespace(msgList)
 
-func (c *defaultPullConsumer) getNextQueueOf(topic string) 
*primitive.MessageQueue {
-       queues, err := 
c.defaultConsumer.client.GetNameSrv().FetchSubscribeMessageQueues(topic)
-       if err != nil && len(queues) > 0 {
-               rlog.Error("get next mq error", map[string]interface{}{
-                       rlog.LogKeyTopic:         topic,
-                       rlog.LogKeyUnderlayError: err.Error(),
-               })
-               return nil
+       msgCtx := &primitive.ConsumeMessageContext{
+               Properties:    make(map[string]string),
+               ConsumerGroup: pc.consumerGroup,
+               MQ:            mq,
+               Msgs:          msgList,
        }
-       var index int64
-       v, exist := queueCounterTable.Load(topic)
-       if !exist {
-               index = -1
-               queueCounterTable.Store(topic, int64(0))
+       ctx = primitive.WithConsumerCtx(ctx, msgCtx)
+       ctx = primitive.WithMethod(ctx, primitive.ConsumerPull)
+       concurrentCtx := primitive.NewConsumeConcurrentlyContext()
+       concurrentCtx.MQ = *mq
+       ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
+
+       if result == ConsumeSuccess {
+               msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.SuccessReturn)
+       } else {
+               msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.FailedReturn)
+       }
+
+       if !pq.IsDroppd() {
+               msgBackFailed := make([]*primitive.MessageExt, 0)
+               msgBackSucceed := make([]*primitive.MessageExt, 0)
+               if result == ConsumeSuccess {
+                       pc.stat.increaseConsumeOKTPS(pc.consumerGroup, 
mq.Topic, len(msgList))
+                       msgBackSucceed = msgList
+               } else {
+                       pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, 
mq.Topic, len(msgList))
+                       if pc.model == BroadCasting {
+                               for i := 0; i < len(msgList); i++ {
+                                       rlog.Warning("defaultPullConsumer 
BROADCASTING, the message consume failed, drop it", map[string]interface{}{
+                                               "message": msgList[i],
+                                       })
+                               }
+                       } else {
+                               for i := 0; i < len(msgList); i++ {
+                                       msg := msgList[i]
+                                       if pc.sendMessageBack(mq.BrokerName, 
msg, concurrentCtx.DelayLevelWhenNextConsume) {
+                                               msgBackSucceed = 
append(msgBackSucceed, msg)
+                                       } else {
+                                               msg.ReconsumeTimes += 1
+                                               msgBackFailed = 
append(msgBackFailed, msg)
+                                       }
+                               }
+                       }
+               }
+
+               offset := pq.removeMessage(msgBackSucceed...)
+
+               if offset >= 0 && !pq.IsDroppd() {
+                       pc.storage.update(mq, int64(offset), true)
+               }
+               if len(msgBackFailed) > 0 {
+                       msgList = msgBackFailed
+                       time.Sleep(5 * time.Second)
+                       goto RETRY
+               }
        } else {
-               index = v.(int64)
+               rlog.Warning("defaultPullConsumer processQueue is dropped 
without process consume result.", map[string]interface{}{
+                       rlog.LogKeyMessageQueue: mq,
+                       "message":               msgList,
+               })
        }
 
-       return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
 }
 
-// SubscribeWithChan ack manually
-func (c *defaultPullConsumer) SubscribeWithChan(topic, selector 
MessageSelector) (chan *primitive.Message, error) {
-       return nil, nil
+// resetRetryAndNamespace modify retry message.
+func (pc *defaultPullConsumer) resetRetryAndNamespace(msgList 
[]*primitive.MessageExt) {
+       groupTopic := internal.RetryGroupTopicPrefix + pc.consumerGroup
+       beginTime := time.Now()
+       for idx := range msgList {
+               msg := msgList[idx]
+               retryTopic := msg.GetProperty(primitive.PropertyRetryTopic)
+               if retryTopic == "" && groupTopic == msg.Topic {
+                       msg.Topic = retryTopic
+               }
+               msgList[idx].WithProperty(primitive.PropertyConsumeStartTime, 
strconv.FormatInt(
+                       beginTime.UnixNano()/int64(time.Millisecond), 10))
+       }
 }
 
-// SubscribeWithFunc ack automatic
-func (c *defaultPullConsumer) SubscribeWithFunc(topic, selector 
MessageSelector,
-       f func(msg *primitive.Message) ConsumeResult) error {
-       return nil
+func (pc *defaultPullConsumer) Pull(ctx context.Context, numbers int) 
(*primitive.PullResult, error) {
+       mq := pc.getNextQueueOf(pc.topic)
+       if mq == nil {
+               return nil, fmt.Errorf("prepare to pull topic: %s, but no queue 
is founded", pc.topic)
+       }
+
+       data := buildSubscriptionData(mq.Topic, pc.selector)
+       nextOffset, err := pc.nextOffsetOf(mq)
+       if err != nil {
+               return nil, err
+       }
+
+       result, err := pc.pull(context.Background(), mq, data, nextOffset, 
numbers)
+       if err != nil {
+               return nil, err
+       }
+
+       pc.processPullResult(mq, result, data)
+       return result, nil
 }
 
-func (c *defaultPullConsumer) ACK(msg *primitive.Message, result 
ConsumeResult) {
+func (pc *defaultPullConsumer) getNextQueueOf(topic string) 
*primitive.MessageQueue {
+       var queues []*primitive.MessageQueue
+       var err error
+       if len(pc.allocateQueues) == 0 {
+               topic = utils.WrapNamespace(pc.option.Namespace, topic)
+               queues, err = 
pc.defaultConsumer.client.GetNameSrv().FetchSubscribeMessageQueues(topic)
+               if err != nil {
+                       rlog.Error("get next mq error", map[string]interface{}{
+                               rlog.LogKeyTopic:         topic,
+                               rlog.LogKeyUnderlayError: err.Error(),
+                       })
+                       return nil
+               }
+
+               if len(queues) == 0 {
+                       rlog.Warning("defaultPullConsumer.getNextQueueOf len is 
0", map[string]interface{}{
+                               rlog.LogKeyTopic: topic,
+                       })
+                       return nil
+               }
+       } else {
+               queues = pc.allocateQueues
+       }
+       index := int(atomic.LoadInt64(&pc.nextQueueSequence)) % len(queues)
+       atomic.AddInt64(&pc.nextQueueSequence, 1)
+
+       nextQueue := queues[index]
+       rlog.Info("defaultPullConsumer.getNextQueueOf", map[string]interface{}{
+               rlog.LogKeyTopic:                topic,
+               rlog.LogKeyConsumerGroup:        pc.consumerGroup,
+               rlog.LogKeyMessageQueue:         queues,
+               rlog.LogKeyAllocateMessageQueue: nextQueue.String(),
+       })
 
+       return nextQueue
 }
 
-func (dc *defaultConsumer) checkPull(ctx context.Context, mq 
*primitive.MessageQueue, offset int64, numbers int) error {
-       err := dc.makeSureStateOK()
+func (pc *defaultPullConsumer) checkPull(mq *primitive.MessageQueue, offset 
int64, numbers int) error {
+       err := pc.makeSureStateOK()
        if err != nil {
                return err
        }
@@ -192,67 +380,415 @@ func (dc *defaultConsumer) checkPull(ctx 
context.Context, mq *primitive.MessageQ
 
 // TODO: add timeout limit
 // TODO: add hook
-func (c *defaultPullConsumer) pull(ctx context.Context, mq 
*primitive.MessageQueue, data *internal.SubscriptionData,
+func (pc *defaultPullConsumer) pull(ctx context.Context, mq 
*primitive.MessageQueue, data *internal.SubscriptionData,
        offset int64, numbers int) (*primitive.PullResult, error) {
 
-       if err := c.checkPull(ctx, mq, offset, numbers); err != nil {
+       mq.Topic = utils.WrapNamespace(pc.option.Namespace, mq.Topic)
+       pc.consumerGroup = utils.WrapNamespace(pc.option.Namespace, 
pc.consumerGroup)
+
+       if err := pc.checkPull(mq, offset, numbers); err != nil {
                return nil, err
        }
 
-       c.subscriptionAutomatically(mq.Topic)
+       pc.subscriptionAutomatically(mq.Topic)
 
        sysFlag := buildSysFlag(false, true, true, false)
 
-       pullResp, err := c.pullInner(ctx, mq, data, offset, numbers, sysFlag, 0)
+       pullResp, err := pc.pullInner(ctx, mq, data, offset, numbers, sysFlag, 
0)
        if err != nil {
                return nil, err
        }
-       c.processPullResult(mq, pullResp, data)
+       pc.processPullResult(mq, pullResp, data)
 
        return pullResp, err
 }
 
-func (c *defaultPullConsumer) makeSureStateOK() error {
-       if atomic.LoadInt32(&c.state) != int32(internal.StateRunning) {
-               return fmt.Errorf("the consumer state is [%d], not running", 
c.state)
-       }
-       return nil
-}
-
-func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) 
int64 {
-       result, _ := c.computePullFromWhereWithException(queue)
-       return result
+func (pc *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) 
(int64, error) {
+       return pc.computePullFromWhereWithException(queue)
 }
 
 // PullFrom pull messages of queue from the offset to offset + numbers
-func (c *defaultPullConsumer) PullFrom(ctx context.Context, queue 
*primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, 
error) {
-       if err := c.checkPull(ctx, queue, offset, numbers); err != nil {
+func (pc *defaultPullConsumer) PullFrom(ctx context.Context, queue 
*primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, 
error) {
+       if err := pc.checkPull(queue, offset, numbers); err != nil {
                return nil, err
        }
 
-       selector := MessageSelector{}
-       data := buildSubscriptionData(queue.Topic, selector)
+       data := buildSubscriptionData(queue.Topic, pc.selector)
 
-       return c.pull(ctx, queue, data, offset, numbers)
+       return pc.pull(ctx, queue, data, offset, numbers)
 }
 
-// updateOffset update offset of queue in mem
-func (c *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, 
offset int64) error {
-       return c.updateOffset(queue, offset)
+// UpdateOffset updateOffset update offset of queue in mem
+func (pc *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, 
offset int64) error {
+       return pc.updateOffset(queue, offset)
 }
 
 // PersistOffset persist all offset in mem.
-func (c *defaultPullConsumer) PersistOffset(ctx context.Context) error {
-       return c.persistConsumerOffset()
+func (pc *defaultPullConsumer) PersistOffset(ctx context.Context, topic 
string) error {
+       return pc.persistConsumerOffset()
 }
 
 // CurrentOffset return the current offset of queue in mem.
-func (c *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) 
(int64, error) {
-       v := c.queryOffset(queue)
+func (pc *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) 
(int64, error) {
+       v := pc.queryOffset(queue)
        return v, nil
 }
 
 // Shutdown close defaultConsumer, refuse new request.
-func (c *defaultPullConsumer) Shutdown() error {
-       return c.defaultConsumer.shutdown()
+func (pc *defaultPullConsumer) Shutdown() error {
+       var err error
+       pc.closeOnce.Do(func() {
+               close(pc.done)
+
+               pc.client.UnregisterConsumer(pc.consumerGroup)
+               err = pc.defaultConsumer.shutdown()
+       })
+
+       return err
+}
+
+func (pc *defaultPullConsumer) PersistConsumerOffset() error {
+       return pc.defaultConsumer.persistConsumerOffset()
+}
+
+func (pc *defaultPullConsumer) UpdateTopicSubscribeInfo(topic string, mqs 
[]*primitive.MessageQueue) {
+       pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs)
+}
+
+func (pc *defaultPullConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
+       return pc.defaultConsumer.isSubscribeTopicNeedUpdate(topic)
+}
+
+func (pc *defaultPullConsumer) SubscriptionDataList() 
[]*internal.SubscriptionData {
+       return pc.defaultConsumer.SubscriptionDataList()
+}
+
+func (pc *defaultPullConsumer) IsUnitMode() bool {
+       return pc.unitMode
+}
+
+func (pc *defaultPullConsumer) GetcType() string {
+       return string(pc.cType)
+}
+
+func (pc *defaultPullConsumer) GetModel() string {
+       return pc.model.String()
+}
+
+func (pc *defaultPullConsumer) GetWhere() string {
+       switch pc.fromWhere {
+       case ConsumeFromLastOffset:
+               return "CONSUME_FROM_LAST_OFFSET"
+       case ConsumeFromFirstOffset:
+               return "CONSUME_FROM_FIRST_OFFSET"
+       case ConsumeFromTimestamp:
+               return "CONSUME_FROM_TIMESTAMP"
+       default:
+               return "UNKOWN"
+       }
+
+}
+
+func (pc *defaultPullConsumer) Rebalance() {
+       pc.defaultConsumer.doBalance()
+}
+
+func (pc *defaultPullConsumer) RebalanceIfNotPaused() {
+       pc.defaultConsumer.doBalanceIfNotPaused()
+}
+
+func (pc *defaultPullConsumer) GetConsumerRunningInfo(stack bool) 
*internal.ConsumerRunningInfo {
+       info := internal.NewConsumerRunningInfo()
+
+       pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
+               topic := key.(string)
+               info.SubscriptionData[value.(*internal.SubscriptionData)] = true
+               status := internal.ConsumeStatus{
+                       PullRT:            pc.stat.getPullRT(pc.consumerGroup, 
topic).avgpt,
+                       PullTPS:           pc.stat.getPullTPS(pc.consumerGroup, 
topic).tps,
+                       ConsumeRT:         
pc.stat.getConsumeRT(pc.consumerGroup, topic).avgpt,
+                       ConsumeOKTPS:      
pc.stat.getConsumeOKTPS(pc.consumerGroup, topic).tps,
+                       ConsumeFailedTPS:  
pc.stat.getConsumeFailedTPS(pc.consumerGroup, topic).tps,
+                       ConsumeFailedMsgs: 
pc.stat.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + 
pc.consumerGroup).sum,
+               }
+               info.StatusTable[topic] = status
+               return true
+       })
+
+       pc.processQueueTable.Range(func(key, value interface{}) bool {
+               mq := key.(primitive.MessageQueue)
+               pq := value.(*processQueue)
+               pInfo := pq.currentInfo()
+               pInfo.CommitOffset, _ = pc.storage.readWithException(&mq, 
_ReadMemoryThenStore)
+               info.MQTable[mq] = pInfo
+               return true
+       })
+
+       if stack {
+               var buffer strings.Builder
+
+               err := pprof.Lookup("goroutine").WriteTo(&buffer, 2)
+               if err != nil {
+                       rlog.Error("error when get stack ", 
map[string]interface{}{
+                               "error": err,
+                       })
+               } else {
+                       info.JStack = buffer.String()
+               }
+       }
+
+       nsAddr := ""
+       for _, value := range pc.client.GetNameSrv().AddrList() {
+               nsAddr += fmt.Sprintf("%s;", value)
+       }
+       info.Properties[internal.PropNameServerAddr] = nsAddr
+       info.Properties[internal.PropConsumeType] = string(pc.cType)
+       info.Properties[internal.PropConsumeOrderly] = 
strconv.FormatBool(pc.consumeOrderly)
+       info.Properties[internal.PropThreadPoolCoreSize] = "-1"
+       info.Properties[internal.PropConsumerStartTimestamp] = 
strconv.FormatInt(pc.consumerStartTimestamp, 10)
+       return info
+}
+
+func (pc *defaultPullConsumer) ConsumeMessageDirectly(msg 
*primitive.MessageExt, brokerName string) 
*internal.ConsumeMessageDirectlyResult {
+       return nil
+}
+
+func (pc *defaultPullConsumer) ResetOffset(topic string, table 
map[primitive.MessageQueue]int64) {
+
+}
+
+func (pc *defaultPullConsumer) messageQueueChanged(topic string, mqAll, 
mqDivided []*primitive.MessageQueue) {
+       var allocateQueues []*primitive.MessageQueue
+       pc.defaultConsumer.processQueueTable.Range(func(key, value interface{}) 
bool {
+               mq := key.(primitive.MessageQueue)
+               allocateQueues = append(allocateQueues, &mq)
+               return true
+       })
+       pc.allocateQueues = allocateQueues
+       pc.defaultConsumer.client.SendHeartbeatToAllBrokerWithLock()
+}
+
+func (pc *defaultPullConsumer) sendMessageBack(brokerName string, msg 
*primitive.MessageExt, delayLevel int) bool {
+       var brokerAddr string
+       if len(brokerName) != 0 {
+               brokerAddr = 
pc.defaultConsumer.client.GetNameSrv().FindBrokerAddrByName(brokerName)
+       } else {
+               brokerAddr = msg.StoreHost
+       }
+       _, err := pc.client.InvokeSync(context.Background(), brokerAddr, 
pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
+       return err == nil
+}
+
+func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt, 
delayLevel int) *remote.RemotingCommand {
+       req := &internal.ConsumerSendMsgBackRequestHeader{
+               Group:             pc.consumerGroup,
+               OriginTopic:       msg.Topic,
+               Offset:            msg.CommitLogOffset,
+               DelayLevel:        delayLevel,
+               OriginMsgId:       msg.MsgId,
+               MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
+       }
+
+       return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, 
nil)
+}
+
+func (pc *defaultPullConsumer) getMaxReconsumeTimes() int32 {
+       if pc.option.MaxReconsumeTimes == -1 {
+               return 16
+       } else {
+               return pc.option.MaxReconsumeTimes
+       }
+}
+
+func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
+       rlog.Debug("defaultPullConsumer start a new Pull Message task for 
PullRequest", map[string]interface{}{
+               rlog.LogKeyPullRequest: request.String(),
+       })
+       var sleepTime time.Duration
+       pq := request.pq
+       go primitive.WithRecover(func() {
+               for {
+                       select {
+                       case <-pc.done:
+                               rlog.Info("defaultPullConsumer close 
pullMessage.", map[string]interface{}{
+                                       rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
+                               })
+                               return
+                       default:
+                               pc.submitToConsume(request.pq, request.mq)
+                               if request.pq.IsDroppd() {
+                                       rlog.Info("defaultPullConsumer quit 
pullMessage for dropped queue.", map[string]interface{}{
+                                               rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
+                                       })
+                                       return
+                               }
+                       }
+               }
+       })
+       for {
+       NEXT:
+               select {
+               case <-pc.done:
+                       rlog.Info("defaultPullConsumer close message handle.", 
map[string]interface{}{
+                               rlog.LogKeyConsumerGroup: pc.consumerGroup,
+                       })
+                       return
+               default:
+               }
+
+               if pq.IsDroppd() {
+                       rlog.Debug("defaultPullConsumer the request was 
dropped, so stop task", map[string]interface{}{
+                               rlog.LogKeyPullRequest: request.String(),
+                       })
+                       return
+               }
+               if sleepTime > 0 {
+                       rlog.Debug(fmt.Sprintf("defaultPullConsumer pull 
MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId, 
sleepTime/time.Millisecond, request.mq), nil)
+                       time.Sleep(sleepTime)
+               }
+               // reset time
+               sleepTime = pc.option.PullInterval
+               pq.lastPullTime.Store(time.Now())
+               err := pc.makeSureStateOK()
+               if err != nil {
+                       rlog.Warning("defaultPullConsumer state error", 
map[string]interface{}{
+                               rlog.LogKeyUnderlayError: err.Error(),
+                       })
+                       sleepTime = _PullDelayTimeWhenError
+                       goto NEXT
+               }
+
+               if pc.pause {
+                       rlog.Debug(fmt.Sprintf("defaultPullConsumer [%s] of 
[%s] was paused, execute pull request [%s] later",
+                               pc.option.InstanceName, pc.consumerGroup, 
request.String()), nil)
+                       sleepTime = _PullDelayTimeWhenSuspend
+                       goto NEXT
+               }
+
+               v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
+               if !exist {
+                       rlog.Info("defaultPullConsumer find the consumer's 
subscription failed", map[string]interface{}{
+                               rlog.LogKeyPullRequest: request.String(),
+                       })
+                       sleepTime = _PullDelayTimeWhenError
+                       goto NEXT
+               }
+               beginTime := time.Now()
+               sd := v.(*internal.SubscriptionData)
+
+               sysFlag := buildSysFlag(false, true, true, false)
+
+               pullRequest := &internal.PullMessageRequestHeader{
+                       ConsumerGroup:        pc.consumerGroup,
+                       Topic:                request.mq.Topic,
+                       QueueId:              int32(request.mq.QueueId),
+                       QueueOffset:          request.nextOffset,
+                       MaxMsgNums:           pc.option.PullBatchSize,
+                       SysFlag:              sysFlag,
+                       CommitOffset:         0,
+                       SubExpression:        sd.SubString,
+                       ExpressionType:       string(TAG),
+                       SuspendTimeoutMillis: 20 * time.Second,
+               }
+
+               brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
+               if brokerResult == nil {
+                       rlog.Warning("defaultPullConsumer no broker found for 
mq", map[string]interface{}{
+                               rlog.LogKeyPullRequest: request.mq.String(),
+                       })
+                       sleepTime = _PullDelayTimeWhenError
+                       goto NEXT
+               }
+
+               if brokerResult.Slave {
+                       pullRequest.SysFlag = 
clearCommitOffsetFlag(pullRequest.SysFlag)
+               }
+
+               result, err := pc.client.PullMessage(context.Background(), 
brokerResult.BrokerAddr, pullRequest)
+               if err != nil {
+                       rlog.Warning("defaultPullConsumer pull message from 
broker error", map[string]interface{}{
+                               rlog.LogKeyBroker:        
brokerResult.BrokerAddr,
+                               rlog.LogKeyUnderlayError: err.Error(),
+                       })
+                       sleepTime = _PullDelayTimeWhenError
+                       goto NEXT
+               }
+
+               if result.Status == primitive.PullBrokerTimeout {
+                       rlog.Warning("defaultPullConsumer pull broker timeout", 
map[string]interface{}{
+                               rlog.LogKeyBroker: brokerResult.BrokerAddr,
+                       })
+                       sleepTime = _PullDelayTimeWhenError
+                       goto NEXT
+               }
+
+               pc.processPullResult(request.mq, result, sd)
+
+               switch result.Status {
+               case primitive.PullFound:
+                       rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found 
messages.", request.mq.Topic, request.mq.QueueId), nil)
+                       prevRequestOffset := request.nextOffset
+                       request.nextOffset = result.NextBeginOffset
+
+                       rt := time.Now().Sub(beginTime) / time.Millisecond
+                       pc.stat.increasePullRT(pc.consumerGroup, 
request.mq.Topic, int64(rt))
+
+                       msgFounded := result.GetMessageExts()
+                       firstMsgOffset := int64(math.MaxInt64)
+                       if len(msgFounded) != 0 {
+                               firstMsgOffset = msgFounded[0].QueueOffset
+                               pc.stat.increasePullTPS(pc.consumerGroup, 
request.mq.Topic, len(msgFounded))
+                               pq.putMessage(msgFounded...)
+                       }
+                       if result.NextBeginOffset < prevRequestOffset || 
firstMsgOffset < prevRequestOffset {
+                               rlog.Warning("[BUG] pull message result maybe 
data wrong", map[string]interface{}{
+                                       "nextBeginOffset":   
result.NextBeginOffset,
+                                       "firstMsgOffset":    firstMsgOffset,
+                                       "prevRequestOffset": prevRequestOffset,
+                               })
+                       }
+               case primitive.PullNoNewMsg, primitive.PullNoMsgMatched:
+                       request.nextOffset = result.NextBeginOffset
+                       pc.correctTagsOffset(request)
+               case primitive.PullOffsetIllegal:
+                       rlog.Warning("defaultPullConsumer the pull request 
offset illegal", map[string]interface{}{
+                               rlog.LogKeyPullRequest: request.String(),
+                               "result":               result.String(),
+                       })
+                       request.nextOffset = result.NextBeginOffset
+                       pq.WithDropped(true)
+                       time.Sleep(10 * time.Second)
+                       pc.storage.update(request.mq, request.nextOffset, false)
+                       
pc.storage.persist([]*primitive.MessageQueue{request.mq})
+                       pc.processQueueTable.Delete(*request.mq)
+                       rlog.Warning(fmt.Sprintf("defaultPullConsumer fix the 
pull request offset: %s", request.String()), nil)
+               default:
+                       rlog.Warning(fmt.Sprintf("defaultPullConsumer unknown 
pull status: %v", result.Status), nil)
+                       sleepTime = _PullDelayTimeWhenError
+               }
+       }
+}
+
+func (pc *defaultPullConsumer) correctTagsOffset(pr *PullRequest) {
+       if pr.pq.cachedMsgCount.Load() <= 0 {
+               pc.storage.update(pr.mq, pr.nextOffset, true)
+       }
+}
+
+func (pc *defaultPullConsumer) consumeMessageCurrently(pq *processQueue, mq 
*primitive.MessageQueue) {
+       msgList := pq.getMessages()
+       if msgList == nil {
+               return
+       }
+       cr := &ConsumeRequest{
+               messageQueue: mq,
+               processQueue: pq,
+               msgList:      msgList,
+       }
+
+       select {
+       case <-pq.closeChan:
+               return
+       case pc.consumeRequestCache <- cr:
+       }
 }
diff --git a/consumer/strategy.go b/consumer/strategy.go
index 4a07928..2af48de 100644
--- a/consumer/strategy.go
+++ b/consumer/strategy.go
@@ -20,7 +20,7 @@ package consumer
 import (
        "strings"
 
-       "github.com/stathat/consistent"
+       "stathat.com/c/consistent"
 
        "github.com/apache/rocketmq-client-go/v2/internal/utils"
        "github.com/apache/rocketmq-client-go/v2/primitive"
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
deleted file mode 100644
index 5b5819e..0000000
--- a/examples/consumer/pull/main.go
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package main
-
-import (
-       "context"
-       "fmt"
-       "github.com/apache/rocketmq-client-go/v2/errors"
-       "time"
-
-       "github.com/apache/rocketmq-client-go/v2"
-       "github.com/apache/rocketmq-client-go/v2/consumer"
-       "github.com/apache/rocketmq-client-go/v2/primitive"
-       "github.com/apache/rocketmq-client-go/v2/rlog"
-)
-
-func main() {
-       c, err := rocketmq.NewPullConsumer(
-               consumer.WithGroupName("testGroup"),
-               
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
-       )
-       if err != nil {
-               rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), 
nil)
-       }
-       err = c.Start()
-       if err != nil {
-               rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), 
nil)
-       }
-
-       ctx := context.Background()
-       queue := primitive.MessageQueue{
-               Topic:      "TopicTest",
-               BrokerName: "", // replace with your broker name. otherwise, 
pull will failed.
-               QueueId:    0,
-       }
-
-       offset := int64(0)
-       for {
-               resp, err := c.PullFrom(ctx, queue, offset, 10)
-               if err != nil {
-                       if err == errors.ErrRequestTimeout {
-                               fmt.Printf("timeout \n")
-                               time.Sleep(1 * time.Second)
-                               continue
-                       }
-                       fmt.Printf("unexpectable err: %v \n", err)
-                       return
-               }
-               if resp.Status == primitive.PullFound {
-                       fmt.Printf("pull message success. nextOffset: %d \n", 
resp.NextBeginOffset)
-                       for _, msg := range resp.GetMessageExts() {
-                               fmt.Printf("pull msg: %v \n", msg)
-                       }
-               }
-               offset = resp.NextBeginOffset
-       }
-}
diff --git a/examples/consumer/pull/poll/main.go 
b/examples/consumer/pull/poll/main.go
new file mode 100644
index 0000000..b6c783b
--- /dev/null
+++ b/examples/consumer/pull/poll/main.go
@@ -0,0 +1,102 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "log"
+       "net/http"
+       _ "net/http/pprof"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/v2"
+
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+
+       "github.com/apache/rocketmq-client-go/v2/consumer"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+const (
+       nameSrvAddr       = "http://127.0.0.1:9876";
+       accessKey         = "rocketmq"
+       secretKey         = "12345678"
+       topic             = "test-topic"
+       consumerGroupName = "testPullGroup"
+       tag               = "testPull"
+       namespace         = "ns"
+)
+
+var pullConsumer rocketmq.PullConsumer
+var sleepTime = 1 * time.Second
+
+func main() {
+       go func() {
+               log.Println(http.ListenAndServe("localhost:6060", nil))
+       }()
+       rlog.SetLogLevel("info")
+       var nameSrv, err = primitive.NewNamesrvAddr(nameSrvAddr)
+       if err != nil {
+               log.Fatalf("NewNamesrvAddr err: %v", err)
+       }
+       pullConsumer, err = rocketmq.NewPullConsumer(
+               consumer.WithGroupName(consumerGroupName),
+               consumer.WithNameServer(nameSrv),
+               consumer.WithCredentials(primitive.Credentials{
+                       AccessKey: accessKey,
+                       SecretKey: secretKey,
+               }),
+               consumer.WithNamespace(namespace),
+               consumer.WithMaxReconsumeTimes(2),
+       )
+       if err != nil {
+               log.Fatalf("fail to new pullConsumer: %v", err)
+       }
+       selector := consumer.MessageSelector{
+               Type:       consumer.TAG,
+               Expression: tag,
+       }
+       err = pullConsumer.Subscribe(topic, selector)
+       if err != nil {
+               log.Fatalf("fail to Subscribe: %v", err)
+       }
+       err = pullConsumer.Start()
+       if err != nil {
+               log.Fatalf("fail to Start: %v", err)
+       }
+
+       for {
+               poll()
+       }
+}
+
+func poll() {
+       cr, err := pullConsumer.Poll(context.TODO(), time.Second*5)
+       if consumer.IsNoNewMsgError(err) {
+               return
+       }
+       if err != nil {
+               log.Printf("[poll error] err=%v", err)
+               time.Sleep(sleepTime)
+               return
+       }
+       // todo LOGIC CODE HERE
+       log.Println("msgList: ", cr.GetMsgList())
+       // pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeRetryLater)
+       pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeSuccess)
+}
diff --git a/examples/consumer/pull/pull/main.go 
b/examples/consumer/pull/pull/main.go
new file mode 100644
index 0000000..0b231a7
--- /dev/null
+++ b/examples/consumer/pull/pull/main.go
@@ -0,0 +1,143 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "log"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/v2"
+
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+
+       "github.com/apache/rocketmq-client-go/v2/consumer"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+const (
+       nameSrvAddr       = "http://127.0.0.1:9876";
+       accessKey         = "rocketmq"
+       secretKey         = "12345678"
+       topic             = "test-topic"
+       consumerGroupName = "testPullGroup"
+       tag               = "testPull"
+       namespace         = "ns"
+)
+
+var pullConsumer rocketmq.PullConsumer
+var sleepTime = 1 * time.Second
+
+const refreshPersistOffsetDuration = time.Second * 5
+
+func main() {
+       rlog.SetLogLevel("info")
+       var nameSrv, err = primitive.NewNamesrvAddr(nameSrvAddr)
+       if err != nil {
+               log.Fatalf("NewNamesrvAddr err: %v", err)
+       }
+       pullConsumer, err = rocketmq.NewPullConsumer(
+               consumer.WithGroupName(consumerGroupName),
+               consumer.WithNameServer(nameSrv),
+               consumer.WithCredentials(primitive.Credentials{
+                       AccessKey: accessKey,
+                       SecretKey: secretKey,
+               }),
+               consumer.WithNamespace(namespace),
+               consumer.WithMaxReconsumeTimes(2),
+       )
+       if err != nil {
+               log.Fatalf("fail to new pullConsumer: %v", err)
+       }
+       selector := consumer.MessageSelector{
+               Type:       consumer.TAG,
+               Expression: tag,
+       }
+       err = pullConsumer.Subscribe(topic, selector)
+       if err != nil {
+               log.Fatalf("fail to Subscribe: %v", err)
+       }
+       err = pullConsumer.Start()
+       if err != nil {
+               log.Fatalf("fail to Start: %v", err)
+       }
+
+       timer := time.NewTimer(refreshPersistOffsetDuration)
+       go func() {
+               for ; true; <-timer.C {
+                       err = pullConsumer.PersistOffset(context.TODO(), topic)
+                       if err != nil {
+                               log.Printf("[pullConsumer.PersistOffset] 
err=%v", err)
+                       }
+                       timer.Reset(refreshPersistOffsetDuration)
+               }
+       }()
+
+       for i := 0; i <= 4; i++ {
+               go func() {
+                       for {
+                               pull()
+                       }
+               }()
+       }
+       // make current thread hold to see pull result. TODO should update here
+       time.Sleep(10000 * time.Second)
+}
+
+func pull() {
+       resp, err := pullConsumer.Pull(context.TODO(), 1)
+       if err != nil {
+               log.Printf("[pull error] err=%v", err)
+               time.Sleep(sleepTime)
+               return
+       }
+       switch resp.Status {
+       case primitive.PullFound:
+               log.Printf("[pull message successfully] MinOffset:%d, 
MaxOffset:%d, nextOffset: %d, len:%d\n", resp.MinOffset, resp.MaxOffset, 
resp.NextBeginOffset, len(resp.GetMessages()))
+               var queue *primitive.MessageQueue
+               if len(resp.GetMessages()) <= 0 {
+                       return
+               }
+               for _, msg := range resp.GetMessageExts() {
+                       // todo LOGIC CODE HERE
+                       queue = msg.Queue
+                       //log.Println(msg.Queue, msg.QueueOffset, 
msg.GetKeys(), msg.MsgId, string(msg.Body))
+                       log.Println(msg)
+               }
+               // update offset
+               err = pullConsumer.UpdateOffset(queue, resp.NextBeginOffset)
+               if err != nil {
+                       log.Printf("[pullConsumer.UpdateOffset] err=%v", err)
+               }
+
+       case primitive.PullNoNewMsg, primitive.PullNoMsgMatched:
+               log.Printf("[no pull message]   next = %d\n", 
resp.NextBeginOffset)
+               time.Sleep(sleepTime)
+               return
+       case primitive.PullBrokerTimeout:
+               log.Printf("[pull broker timeout]  next = %d\n", 
resp.NextBeginOffset)
+
+               time.Sleep(sleepTime)
+               return
+       case primitive.PullOffsetIllegal:
+               log.Printf("[pull offset illegal] next = %d\n", 
resp.NextBeginOffset)
+               return
+       default:
+               log.Printf("[pull error]  next = %d\n", resp.NextBeginOffset)
+       }
+}
diff --git a/examples/consumer/pull/pull_from/main.go 
b/examples/consumer/pull/pull_from/main.go
new file mode 100644
index 0000000..62445f5
--- /dev/null
+++ b/examples/consumer/pull/pull_from/main.go
@@ -0,0 +1,177 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "log"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/v2"
+
+       "github.com/apache/rocketmq-client-go/v2/admin"
+       "github.com/apache/rocketmq-client-go/v2/consumer"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/go-redis/redis/v8"
+)
+
+const (
+       nameSrvAddr       = "http://127.0.0.1:9876";
+       accessKey         = "rocketmq"
+       secretKey         = "12345678"
+       topic             = "test-topic"
+       consumerGroupName = "testPullFromGroup"
+       tag               = "testPullFrom"
+       namespace         = "ns"
+)
+
+var ctx = context.Background()
+var client = redis.NewClient(&redis.Options{
+       Addr:     "127.0.0.1:6379",
+       Password: "", // no password set
+       DB:       0,  // use default DB
+})
+
+func main() {
+       var nameSrv, err = primitive.NewNamesrvAddr(nameSrvAddr)
+       if err != nil {
+               log.Fatalf("NewNamesrvAddr err: %v", err)
+       }
+       pullConsumer, err := rocketmq.NewPullConsumer(
+               consumer.WithGroupName(consumerGroupName),
+               consumer.WithNameServer(nameSrv),
+               consumer.WithCredentials(primitive.Credentials{
+                       AccessKey: accessKey,
+                       SecretKey: secretKey,
+               }),
+               consumer.WithNamespace(namespace),
+       )
+       if err != nil {
+               log.Fatalf("fail to new pullConsumer: %v", err)
+       }
+
+       selector := consumer.MessageSelector{
+               Type:       consumer.TAG,
+               Expression: tag,
+       }
+       err = pullConsumer.Subscribe(topic, selector)
+       if err != nil {
+               log.Fatalf("Subscribe error: %s\n", err)
+       }
+       err = pullConsumer.Start()
+       if err != nil {
+               log.Fatalf("fail to new pullConsumer: %v", err)
+       }
+
+       nameSrvAddr := []string{nameSrvAddr}
+       mqAdmin, err := admin.NewAdmin(
+               
admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)),
+               admin.WithCredentials(primitive.Credentials{
+                       AccessKey: accessKey,
+                       SecretKey: secretKey,
+               }),
+               admin.WithNamespace(namespace),
+       )
+       if err != nil {
+               log.Fatalf("fail to new admin: %v", err)
+       }
+
+       messageQueues, err := mqAdmin.FetchPublishMessageQueues(ctx, topic)
+       if err != nil {
+               log.Fatalf("fetch messahe queue error: %s\n", err)
+       }
+       var sleepTime = 1 * time.Second
+
+       for _, queue := range messageQueues {
+               offset := getOffset(client, consumerGroupName, topic, 
queue.QueueId)
+               if offset < 0 { // default consume from offset 0
+                       offset = 0
+               }
+               go func(queue *primitive.MessageQueue) {
+                       for {
+                               resp, err := pullConsumer.PullFrom(ctx, queue, 
offset, 1) // pull one message per time for make sure easily ACK
+                               if err != nil {
+                                       log.Printf("[pull error] topic=%s, 
queue id=%d, broker=%s, offset=%d\n", topic, queue.QueueId, queue.BrokerName, 
offset)
+                                       time.Sleep(sleepTime)
+                                       continue
+                               }
+                               switch resp.Status {
+                               case primitive.PullFound:
+                                       log.Printf("[pull message successfully] 
queue id = %d, nextOffset: %d \n", queue.QueueId, resp.NextBeginOffset)
+                                       for _, msg := range 
resp.GetMessageExts() {
+                                               // todo LOGIC CODE HERE
+
+                                               log.Println(msg.GetKeys(), 
msg.MsgId, string(msg.Body))
+
+                                               // save offset to redis
+                                               err = ackOffset(client, 
consumerGroupName, topic, queue.QueueId, resp.NextBeginOffset)
+                                               if err != nil {
+                                                       //todo ack error logic
+                                               }
+
+                                               // set offset for next pull
+                                               offset = resp.NextBeginOffset
+                                       }
+                               case primitive.PullNoNewMsg, 
primitive.PullNoMsgMatched:
+                                       log.Printf("[no pull message] topic=%s, 
queue id=%d, broker=%s, offset=%d,  next = %d\n", queue.Topic, queue.QueueId, 
queue.BrokerName, offset, resp.NextBeginOffset)
+                                       time.Sleep(sleepTime)
+                                       offset = resp.NextBeginOffset
+                                       continue
+                               case primitive.PullBrokerTimeout:
+                                       log.Printf("[pull broker timeout] 
topic=%s, queue id=%d, broker=%s, offset=%d,  next = %d\n", queue.Topic, 
queue.QueueId, queue.BrokerName, offset, resp.NextBeginOffset)
+
+                                       time.Sleep(sleepTime)
+                                       continue
+                               case primitive.PullOffsetIllegal:
+                                       log.Printf("[pull offset illegal] 
topic=%s, queue id=%d, broker=%s, offset=%d,  next = %d\n", queue.Topic, 
queue.QueueId, queue.BrokerName, offset, resp.NextBeginOffset)
+                                       offset = resp.NextBeginOffset
+                                       continue
+                               default:
+                                       log.Printf("[pull error] topic=%s, 
queue id=%d, broker=%s, offset=%d,  next = %d\n", queue.Topic, queue.QueueId, 
queue.BrokerName, offset, resp.NextBeginOffset)
+                               }
+                       }
+               }(queue)
+       }
+       // make current thread hold to see pull result. TODO should update here
+       time.Sleep(10000 * time.Second)
+}
+
+func ackOffset(redis *redis.Client, consumerGroupName string, topic string, 
queueId int, consumedOffset int64) error {
+       var key = fmt.Sprintf("rmq-%s-%s-%d", consumerGroupName, topic, queueId)
+       err := redis.Set(ctx, key, consumedOffset, 0).Err()
+       if err != nil {
+               log.Printf("set redis error, key:%s, %v\n", key, err)
+               return err
+       }
+       return nil
+}
+
+func getOffset(redisCli *redis.Client, consumerGroupName string, topic string, 
queueId int) int64 {
+       var key = fmt.Sprintf("rmq-%s-%s-%d", consumerGroupName, topic, queueId)
+       offset, err := redisCli.Get(ctx, key).Int64()
+       if err == redis.Nil {
+               return 0
+       } else if err != nil {
+               log.Printf("get redis error, key:%s, %v\n", key, err)
+               //todo Your own logic. like get from db.
+               return 0
+       } else {
+               return offset
+       }
+}
diff --git a/go.mod b/go.mod
index 74d5eb6..09acdad 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,7 @@ go 1.13
 require (
        github.com/BurntSushi/toml v1.1.0 // indirect
        github.com/emirpasic/gods v1.12.0
+       github.com/go-redis/redis/v8 v8.11.5
        github.com/golang/mock v1.3.1
        github.com/json-iterator/go v1.1.12
        github.com/patrickmn/go-cache v2.1.0+incompatible
@@ -12,13 +13,11 @@ require (
        github.com/satori/go.uuid v1.2.0
        github.com/sirupsen/logrus v1.4.0
        github.com/smartystreets/goconvey v1.6.4
-       github.com/stathat/consistent v1.0.0
-       github.com/stretchr/testify v1.3.0
+       github.com/stretchr/testify v1.5.1
        github.com/tidwall/gjson v1.13.0
        go.uber.org/atomic v1.5.1
        golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
        gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
        gopkg.in/natefinch/lumberjack.v2 v2.0.0
-       gopkg.in/yaml.v2 v2.4.0 // indirect
-       stathat.com/c/consistent v1.0.0 // indirect
+       stathat.com/c/consistent v1.0.0
 )
diff --git a/go.sum b/go.sum
index 543d1ab..1ec2f17 100644
--- a/go.sum
+++ b/go.sum
@@ -1,15 +1,46 @@
 github.com/BurntSushi/toml v1.1.0 
h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
 github.com/BurntSushi/toml v1.1.0/go.mod 
h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
+github.com/cespare/xxhash/v2 v2.1.2 
h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
+github.com/cespare/xxhash/v2 v2.1.2/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/chzyer/logex v1.1.10/go.mod 
h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
+github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod 
h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
+github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod 
h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f 
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod 
h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 github.com/emirpasic/gods v1.12.0 
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
 github.com/emirpasic/gods v1.12.0/go.mod 
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
+github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9 
h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
+github.com/fsnotify/fsnotify v1.4.9/go.mod 
h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/go-redis/redis/v8 v8.11.5 
h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
+github.com/go-redis/redis/v8 v8.11.5/go.mod 
h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod 
h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
 github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
 github.com/golang/mock v1.3.1/go.mod 
h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
+github.com/golang/protobuf v1.2.0/go.mod 
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod 
h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod 
h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod 
h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod 
h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod 
h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod 
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 
h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod 
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/google/go-cmp v0.3.0/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod 
h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 
h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
 github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod 
h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/hpcloud/tail v1.0.0/go.mod 
h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
+github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod 
h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/json-iterator/go v1.1.12 
h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
 github.com/json-iterator/go v1.1.12/go.mod 
h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 github.com/jtolds/gls v4.20.0+incompatible 
h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
@@ -25,6 +56,21 @@ github.com/modern-go/concurrent 
v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH
 github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v1.0.2 
h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
 github.com/modern-go/reflect2 v1.0.2/go.mod 
h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/nxadm/tail v1.4.4/go.mod 
h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
+github.com/nxadm/tail v1.4.8/go.mod 
h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
+github.com/onsi/ginkgo v1.6.0/go.mod 
h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod 
h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.16.4/go.mod 
h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
+github.com/onsi/ginkgo v1.16.5/go.mod 
h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
+github.com/onsi/ginkgo/v2 v2.0.0 
h1:CcuG/HvWNkkaqCUpJifQY8z7qEMBJya6aLPx6ftGyjQ=
+github.com/onsi/ginkgo/v2 v2.0.0/go.mod 
h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
+github.com/onsi/gomega v1.7.1/go.mod 
h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1/go.mod 
h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/onsi/gomega v1.17.0/go.mod 
h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
+github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
+github.com/onsi/gomega v1.18.1/go.mod 
h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
 github.com/patrickmn/go-cache v2.1.0+incompatible 
h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
 github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod 
h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
 github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
@@ -39,45 +85,95 @@ github.com/smartystreets/assertions 
v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE
 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod 
h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
 github.com/smartystreets/goconvey v1.6.4 
h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
 github.com/smartystreets/goconvey v1.6.4/go.mod 
h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
-github.com/stathat/consistent v1.0.0 
h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
-github.com/stathat/consistent v1.0.0/go.mod 
h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
 github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-github.com/stretchr/testify v1.3.0 
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.5.1 
h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
+github.com/stretchr/testify v1.5.1/go.mod 
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
 github.com/tidwall/gjson v1.13.0 
h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M=
 github.com/tidwall/gjson v1.13.0/go.mod 
h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
 github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
 github.com/tidwall/match v1.1.1/go.mod 
h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
 github.com/tidwall/pretty v1.2.0 
h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
 github.com/tidwall/pretty v1.2.0/go.mod 
h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
+github.com/yuin/goldmark v1.2.1/go.mod 
h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM=
 go.uber.org/atomic v1.5.1/go.mod 
h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
-golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 
h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 
h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod 
h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/lint v0.0.0-20190930215403-16217165b5de 
h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
 golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod 
h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod 
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod 
h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 
h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
+golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod 
h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f 
h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
 golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod 
h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod 
h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod 
h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
-golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c 
h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
 golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e 
h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod 
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 
h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod 
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod 
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod 
h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod 
h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod 
h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.23.0/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod 
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 
h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod 
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c 
h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod 
h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod 
h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 
h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod 
h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 
h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod 
h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
diff --git a/internal/utils/namespace.go b/internal/utils/namespace.go
new file mode 100644
index 0000000..5e98706
--- /dev/null
+++ b/internal/utils/namespace.go
@@ -0,0 +1,24 @@
+package utils
+
+import "strings"
+
+const namespaceSeparator = "%"
+
+func WrapNamespace(namespace, resourceWithOutNamespace string) string {
+       if IsEmpty(namespace) || IsEmpty(resourceWithOutNamespace) {
+               return resourceWithOutNamespace
+       }
+
+       if isAlreadyWithNamespace(resourceWithOutNamespace, namespace) {
+               return resourceWithOutNamespace
+       }
+
+       return namespace + namespaceSeparator + resourceWithOutNamespace
+}
+
+func isAlreadyWithNamespace(resource, namespace string) bool {
+       if IsEmpty(namespace) || IsEmpty(resource) {
+               return false
+       }
+       return strings.Contains(resource, namespace+namespaceSeparator)
+}
diff --git a/internal/utils/string.go b/internal/utils/string.go
index f347397..a73b400 100644
--- a/internal/utils/string.go
+++ b/internal/utils/string.go
@@ -17,6 +17,8 @@ limitations under the License.
 
 package utils
 
+import "strings"
+
 // HashString hashes a string to a unique hashcode.
 func HashString(s string) int {
        val := []byte(s)
@@ -28,3 +30,7 @@ func HashString(s string) int {
 
        return int(h)
 }
+
+func IsEmpty(s string) bool {
+       return strings.TrimSpace(s) == ""
+}
diff --git a/primitive/result.go b/primitive/result.go
index 20d393a..18c4352 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -123,6 +123,8 @@ func (result *PullResult) String() string {
 
 func toMessages(messageExts []*MessageExt) []*Message {
        msgs := make([]*Message, 0)
-
+       for _, messageExt := range messageExts {
+               msgs = append(msgs, &messageExt.Message)
+       }
        return msgs
 }
diff --git a/rlog/log.go b/rlog/log.go
index b387ed7..e4070b7 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -28,16 +28,17 @@ import (
 )
 
 const (
-       LogKeyProducerGroup    = "producerGroup"
-       LogKeyConsumerGroup    = "consumerGroup"
-       LogKeyTopic            = "topic"
-       LogKeyMessageQueue     = "MessageQueue"
-       LogKeyUnderlayError    = "underlayError"
-       LogKeyBroker           = "broker"
-       LogKeyValueChangedFrom = "changedFrom"
-       LogKeyValueChangedTo   = "changeTo"
-       LogKeyPullRequest      = "PullRequest"
-       LogKeyTimeStamp        = "timestamp"
+       LogKeyProducerGroup        = "producerGroup"
+       LogKeyConsumerGroup        = "consumerGroup"
+       LogKeyTopic                = "topic"
+       LogKeyMessageQueue         = "MessageQueue"
+       LogKeyAllocateMessageQueue = "AllocateMessageQueue"
+       LogKeyUnderlayError        = "underlayError"
+       LogKeyBroker               = "broker"
+       LogKeyValueChangedFrom     = "changedFrom"
+       LogKeyValueChangedTo       = "changeTo"
+       LogKeyPullRequest          = "PullRequest"
+       LogKeyTimeStamp            = "timestamp"
 )
 
 type Logger interface {

Reply via email to