This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ae76839 feat: support lite pull consumer (#881)
ae76839 is described below
commit ae76839caba1c718c5b6569682dbc4a931f3f40d
Author: Nick <[email protected]>
AuthorDate: Sat Aug 13 14:05:06 2022 +0800
feat: support lite pull consumer (#881)
* feat: support lite pull consumer
* remove the real ip.
* go.mod require go 1.13,so remove old versions.
* 1.no need to persist offset,rocketmq client do it when start;2.add pprof.
* Update .travis.yml
Co-authored-by: dinglei <[email protected]>
---
.travis.yml | 4 +-
admin/admin.go | 21 +-
api.go | 79 +---
consumer/consumer.go | 77 ++--
consumer/option.go | 2 +
consumer/pull_consumer.go | 738 ++++++++++++++++++++++++++-----
consumer/strategy.go | 2 +-
examples/consumer/pull/main.go | 72 ---
examples/consumer/pull/poll/main.go | 102 +++++
examples/consumer/pull/pull/main.go | 143 ++++++
examples/consumer/pull/pull_from/main.go | 177 ++++++++
go.mod | 7 +-
go.sum | 106 ++++-
internal/utils/namespace.go | 24 +
internal/utils/string.go | 6 +
primitive/result.go | 4 +-
rlog/log.go | 21 +-
17 files changed, 1293 insertions(+), 292 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index e267cee..a3ea9e8 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,9 +1,9 @@
language: go
go:
- - "1.11.x"
- - "1.12.x"
- "1.13.x"
+ - "1.16.x"
+ - "1.18.x"
go_import_path: github.com/apache/rocketmq-client-go/v2
env:
global:
diff --git a/admin/admin.go b/admin/admin.go
index 8169dcb..20e1794 100644
--- a/admin/admin.go
+++ b/admin/admin.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
+ "github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
@@ -35,6 +36,7 @@ type Admin interface {
//TODO
//TopicList(ctx context.Context, mq *primitive.MessageQueue)
(*remote.RemotingCommand, error)
//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand,
error)
+ FetchPublishMessageQueues(ctx context.Context, topic string)
([]*primitive.MessageQueue, error)
Close() error
}
@@ -61,6 +63,19 @@ func WithResolver(resolver primitive.NsResolver) AdminOption
{
}
}
+func WithCredentials(c primitive.Credentials) AdminOption {
+ return func(options *adminOptions) {
+ options.ClientOptions.Credentials = c
+ }
+}
+
+// WithNamespace set the namespace of admin
+func WithNamespace(namespace string) AdminOption {
+ return func(options *adminOptions) {
+ options.ClientOptions.Namespace = namespace
+ }
+}
+
type admin struct {
cli internal.RMQClient
@@ -70,7 +85,7 @@ type admin struct {
}
// NewAdmin initialize admin
-func NewAdmin(opts ...AdminOption) (Admin, error) {
+func NewAdmin(opts ...AdminOption) (*admin, error) {
defaultOpts := defaultAdminOptions()
for _, opt := range opts {
opt(defaultOpts)
@@ -202,6 +217,10 @@ func (a *admin) DeleteTopic(ctx context.Context, opts
...OptionDelete) error {
return nil
}
+func (a *admin) FetchPublishMessageQueues(ctx context.Context, topic string)
([]*primitive.MessageQueue, error) {
+ return
a.cli.GetNameSrv().FetchPublishMessageQueues(utils.WrapNamespace(a.opts.Namespace,
topic))
+}
+
func (a *admin) Close() error {
a.closeOnce.Do(func() {
a.cli.Shutdown()
diff --git a/api.go b/api.go
index 6a39c28..449f754 100644
--- a/api.go
+++ b/api.go
@@ -22,7 +22,6 @@ import (
"time"
"github.com/apache/rocketmq-client-go/v2/consumer"
- "github.com/apache/rocketmq-client-go/v2/errors"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
@@ -81,67 +80,37 @@ type PullConsumer interface {
// Start the PullConsumer for consuming message
Start() error
- // Shutdown the PullConsumer, all offset of MessageQueue will be commit
to broker before process exit
- Shutdown() error
-
// Subscribe a topic for consuming
Subscribe(topic string, selector consumer.MessageSelector) error
// Unsubscribe a topic
Unsubscribe(topic string) error
- // MessageQueues get MessageQueue list about for a given topic. This
method will issue a remote call to the server
- // if it does not already have any MessageQueue about the given topic.
- MessageQueues(topic string) []primitive.MessageQueue
-
- // Pull message for the topic specified. It is an error to not have
subscribed to any topics before pull for message
- //
- // Specified numbers of messages is returned if message greater that
numbers, and the offset will auto forward.
- // It means that if you meeting messages consuming failed, you should
process failed messages by yourself.
- Pull(ctx context.Context, topic string, numbers int)
(*primitive.PullResult, error)
-
- // Pull message for the topic specified from a specified MessageQueue
and offset. It is an error to not have
- // subscribed to any topics before pull for message. the method will
not affect the offset recorded
- //
- // Specified numbers of messages is returned.
- PullFrom(ctx context.Context, mq primitive.MessageQueue, offset int64,
numbers int) (*primitive.PullResult, error)
-
- // Lookup offset for the given message queue by timestamp. The returned
offset for the message queue is the
- // earliest offset whose timestamp is greater than or equal to the
given timestamp in the corresponding message
- // queue.
- //
- // Timestamp must be millisecond level, if you want to lookup the
earliest offset of the mq, you could set the
- // timestamp 0, and if you want to the latest offset the mq, you could
set the timestamp math.MaxInt64.
- Lookup(ctx context.Context, mq primitive.MessageQueue, timestamp int64)
(int64, error)
-
- // Commit the offset of specified mqs to broker, if auto-commit is
disable, you must commit the offset manually.
- Commit(ctx context.Context, mqs ...primitive.MessageQueue) (int64,
error)
-
- // CommittedOffset return the offset of specified Message
- CommittedOffset(mq primitive.MessageQueue) (int64, error)
-
- // Seek set offset of the mq, if you wanna re-consuming your message
form one position, the method may help you.
- // if you want re-consuming from one time, you cloud Lookup() then seek
it.
- Seek(mq primitive.MessageQueue, offset int64) error
-
- // Pause consuming for specified MessageQueues, after pause, client
will not fetch any message from the specified
- // message queues
- //
- // Note that this method does not affect message queue subscription. In
particular, it does not cause a group
- // rebalance.
- //
- // if a MessageQueue belong a topic that has not been subscribed, an
error will be returned
- //Pause(mqs ...primitive.MessageQueue) error
-
- // Resume specified message queues which have been paused with Pause,
if a MessageQueue that not paused,
- // it will be ignored. if not subscribed, an error will be returned
- //Resume(mqs ...primitive.MessageQueue) error
+ // Shutdown the PullConsumer, all offset of MessageQueue will be commit
to broker before process exit
+ Shutdown() error
+
+ // Poll messages with timeout.
+ Poll(ctx context.Context, timeout time.Duration)
(*consumer.ConsumeRequest, error)
+
+ //ACK ACK
+ ACK(ctx context.Context, cr *consumer.ConsumeRequest, consumeResult
consumer.ConsumeResult)
+
+ // Pull message of topic, selector indicate which queue to pull.
+ Pull(ctx context.Context, numbers int) (*primitive.PullResult, error)
+
+ // PullFrom pull messages of queue from the offset to offset + numbers
+ PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset
int64, numbers int) (*primitive.PullResult, error)
+
+ // UpdateOffset updateOffset update offset of queue in mem
+ UpdateOffset(queue *primitive.MessageQueue, offset int64) error
+
+ // PersistOffset persist all offset in mem.
+ PersistOffset(ctx context.Context, topic string) error
+
+ // CurrentOffset return the current offset of queue in mem.
+ CurrentOffset(queue *primitive.MessageQueue) (int64, error)
}
-// The PullConsumer has not implemented completely, if you want have an
experience of PullConsumer, you could use
-// consumer.NewPullConsumer(...), but it may changed in the future.
-//
-// The PullConsumer will be supported in next release
func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
- return nil, errors.ErrPullConsumer
+ return consumer.NewPullConsumer(opts...)
}
diff --git a/consumer/consumer.go b/consumer/consumer.go
index b605659..0d13764 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -269,6 +269,7 @@ type defaultConsumer struct {
}
func (dc *defaultConsumer) start() error {
+ dc.consumerGroup = utils.WrapNamespace(dc.option.Namespace,
dc.consumerGroup)
if dc.model == Clustering {
// set retry topic
retryTopic := internal.GetRetryTopic(dc.consumerGroup)
@@ -694,53 +695,51 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic
string, mqs []*primitiv
return true
})
- if dc.cType == _PushConsume {
- for item := range mqSet {
- // BUG: the mq will send to channel, if not copy once,
the next iter will modify the mq in the channel.
- mq := item
+ for item := range mqSet {
+ // BUG: the mq will send to channel, if not copy once, the next
iter will modify the mq in the channel.
+ mq := item
+ _, exist := dc.processQueueTable.Load(mq)
+ if exist {
+ continue
+ }
+ if dc.consumeOrderly && !dc.lock(&mq) {
+ rlog.Warning("do defaultConsumer, add a new mq failed,
because lock failed", map[string]interface{}{
+ rlog.LogKeyConsumerGroup: dc.consumerGroup,
+ rlog.LogKeyMessageQueue: mq.String(),
+ })
+ continue
+ }
+ dc.storage.remove(&mq)
+ nextOffset, err := dc.computePullFromWhereWithException(&mq)
+
+ if nextOffset >= 0 && err == nil {
_, exist := dc.processQueueTable.Load(mq)
if exist {
- continue
- }
- if dc.consumeOrderly && !dc.lock(&mq) {
- rlog.Warning("do defaultConsumer, add a new mq
failed, because lock failed", map[string]interface{}{
+ rlog.Debug("updateProcessQueueTable do
defaultConsumer, mq already exist", map[string]interface{}{
rlog.LogKeyConsumerGroup:
dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
- continue
- }
- dc.storage.remove(&mq)
- nextOffset, err :=
dc.computePullFromWhereWithException(&mq)
-
- if nextOffset >= 0 && err == nil {
- _, exist := dc.processQueueTable.Load(mq)
- if exist {
- rlog.Debug("do defaultConsumer, mq
already exist", map[string]interface{}{
- rlog.LogKeyConsumerGroup:
dc.consumerGroup,
- rlog.LogKeyMessageQueue:
mq.String(),
- })
- } else {
- rlog.Debug("do defaultConsumer, add a
new mq", map[string]interface{}{
- rlog.LogKeyConsumerGroup:
dc.consumerGroup,
- rlog.LogKeyMessageQueue:
mq.String(),
- })
- pq := newProcessQueue(dc.consumeOrderly)
- dc.processQueueTable.Store(mq, pq)
- pr := PullRequest{
- consumerGroup: dc.consumerGroup,
- mq: &mq,
- pq: pq,
- nextOffset: nextOffset,
- }
- dc.prCh <- pr
- changed = true
- }
} else {
- rlog.Warning("do defaultConsumer, add a new mq
failed", map[string]interface{}{
+ rlog.Debug("updateProcessQueueTable do
defaultConsumer, add a new mq", map[string]interface{}{
rlog.LogKeyConsumerGroup:
dc.consumerGroup,
rlog.LogKeyMessageQueue: mq.String(),
})
+ pq := newProcessQueue(dc.consumeOrderly)
+ dc.processQueueTable.Store(mq, pq)
+ pr := PullRequest{
+ consumerGroup: dc.consumerGroup,
+ mq: &mq,
+ pq: pq,
+ nextOffset: nextOffset,
+ }
+ dc.prCh <- pr
+ changed = true
}
+ } else {
+ rlog.Warning("do defaultConsumer, add a new mq failed",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: dc.consumerGroup,
+ rlog.LogKeyMessageQueue: mq.String(),
+ })
}
}
@@ -760,9 +759,6 @@ func (dc *defaultConsumer) computePullFromWhere(mq
*primitive.MessageQueue) int6
}
func (dc *defaultConsumer) computePullFromWhereWithException(mq
*primitive.MessageQueue) (int64, error) {
- if dc.cType == _PullConsume {
- return 0, nil
- }
result := int64(-1)
lastOffset, err := dc.storage.readWithException(mq, _ReadFromStore)
if err != nil {
@@ -898,6 +894,7 @@ func (dc *defaultConsumer) processPullResult(mq
*primitive.MessageQueue, result
// TODO: add filter message hook
for _, msg := range msgListFilterAgain {
+ msg.Queue = mq
traFlag, _ :=
strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
if traFlag {
msg.TransactionId =
msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
diff --git a/consumer/option.go b/consumer/option.go
index 3b035a3..ca6c8d5 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -134,6 +134,8 @@ func defaultPullConsumerOptions() consumerOptions {
opts := consumerOptions{
ClientOptions: internal.DefaultClientOptions(),
Resolver: primitive.NewHttpResolver("DEFAULT"),
+ ConsumerModel: Clustering,
+ Strategy: AllocateByAveragely,
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 4699601..1ca4881 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -20,10 +20,17 @@ package consumer
import (
"context"
"fmt"
+ "math"
+ "runtime/pprof"
+ "strconv"
+ "strings"
"sync"
"sync/atomic"
+ "time"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+ "github.com/apache/rocketmq-client-go/v2/internal/remote"
+ "github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/pkg/errors"
@@ -32,43 +39,38 @@ import (
"github.com/apache/rocketmq-client-go/v2/rlog"
)
-type PullConsumer interface {
- // Start
- Start()
+// ErrNoNewMsg returns a "no new message found". Occurs only when no new
message found from broker
+var ErrNoNewMsg = errors.New("no new message found")
- // Shutdown refuse all new pull operation, finish all submitted.
- Shutdown()
-
- // Pull pull message of topic, selector indicate which queue to pull.
- Pull(ctx context.Context, topic string, selector MessageSelector,
numbers int) (*primitive.PullResult, error)
-
- // PullFrom pull messages of queue from the offset to offset + numbers
- PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset
int64, numbers int) (*primitive.PullResult, error)
-
- // updateOffset update offset of queue in mem
- UpdateOffset(queue *primitive.MessageQueue, offset int64) error
-
- // PersistOffset persist all offset in mem.
- PersistOffset(ctx context.Context) error
+func IsNoNewMsgError(err error) bool {
+ return err == ErrNoNewMsg
+}
- // CurrentOffset return the current offset of queue in mem.
- CurrentOffset(queue *primitive.MessageQueue) (int64, error)
+type ConsumeRequest struct {
+ messageQueue *primitive.MessageQueue
+ processQueue *processQueue
+ msgList []*primitive.MessageExt
}
-var (
- queueCounterTable sync.Map
-)
+func (cr *ConsumeRequest) GetMsgList() []*primitive.MessageExt {
+ return cr.msgList
+}
type defaultPullConsumer struct {
*defaultConsumer
- option consumerOptions
- client internal.RMQClient
- GroupName string
- Model MessageModel
- UnitMode bool
-
- interceptor primitive.Interceptor
+ topic string
+ selector MessageSelector
+ GroupName string
+ Model MessageModel
+ UnitMode bool
+ nextQueueSequence int64
+ allocateQueues []*primitive.MessageQueue
+
+ done chan struct{}
+ closeOnce sync.Once
+ consumeRequestCache chan *ConsumeRequest
+ submitToConsume func(*processQueue, *primitive.MessageQueue)
}
func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
@@ -85,12 +87,13 @@ func NewPullConsumer(options ...Option)
(*defaultPullConsumer, error) {
defaultOpts.Namesrv = srvs
dc := &defaultConsumer{
client:
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
- consumerGroup: defaultOpts.GroupName,
+ consumerGroup: utils.WrapNamespace(defaultOpts.Namespace,
defaultOpts.GroupName),
cType: _PullConsume,
state: int32(internal.StateCreateJust),
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
option: defaultOpts,
+ allocate: defaultOpts.Strategy,
}
if dc.client == nil {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
@@ -98,80 +101,265 @@ func NewPullConsumer(options ...Option)
(*defaultPullConsumer, error) {
defaultOpts.Namesrv = dc.client.GetNameSrv()
c := &defaultPullConsumer{
- defaultConsumer: dc,
+ defaultConsumer: dc,
+ done: make(chan struct{}, 1),
+ consumeRequestCache: make(chan *ConsumeRequest, 4),
}
+ dc.mqChanged = c.messageQueueChanged
+ c.submitToConsume = c.consumeMessageCurrently
return c, nil
}
-func (c *defaultPullConsumer) Start() error {
- atomic.StoreInt32(&c.state, int32(internal.StateRunning))
+func (pc *defaultPullConsumer) Subscribe(topic string, selector
MessageSelector) error {
+ if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
+ atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
+ return errors2.ErrStartTopic
+ }
+ topic = utils.WrapNamespace(pc.option.Namespace, topic)
+
+ data := buildSubscriptionData(topic, selector)
+ pc.subscriptionDataTable.Store(topic, data)
+ pc.topic = topic
+ pc.selector = selector
+
+ return nil
+}
+
+func (pc *defaultPullConsumer) Unsubscribe(topic string) error {
+ topic = utils.WrapNamespace(pc.option.Namespace, topic)
+ pc.subscriptionDataTable.Delete(topic)
+ return nil
+}
+
+func (pc *defaultPullConsumer) Start() error {
+ atomic.StoreInt32(&pc.state, int32(internal.StateRunning))
var err error
- c.once.Do(func() {
- err = c.start()
+ pc.once.Do(func() {
+ consumerGroupWithNs := utils.WrapNamespace(pc.option.Namespace,
pc.consumerGroup)
+ err =
pc.defaultConsumer.client.RegisterConsumer(consumerGroupWithNs, pc)
if err != nil {
+ rlog.Error("defaultPullConsumer the consumer group has
been created, specify another one", map[string]interface{}{
+ rlog.LogKeyConsumerGroup: pc.consumerGroup,
+ })
+ err = errors2.ErrCreated
return
}
+ err = pc.start()
+ if err != nil {
+ return
+ }
+
+ go func() {
+ for {
+ select {
+ case pr := <-pc.prCh:
+ go func() {
+ pc.pullMessage(&pr)
+ }()
+ case <-pc.done:
+ rlog.Info("defaultPullConsumer close
PullRequest listener.", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
pc.consumerGroup,
+ })
+ return
+ }
+ }
+ }()
})
+ pc.client.UpdateTopicRouteInfo()
+ _, exist := pc.topicSubscribeInfoTable.Load(pc.topic)
+ if !exist {
+ err = pc.Shutdown()
+ if err != nil {
+ rlog.Error("defaultPullConsumer.Shutdown . route info
not found, it may not exist", map[string]interface{}{
+ rlog.LogKeyTopic: pc.topic,
+ rlog.LogKeyUnderlayError: err,
+ })
+ }
+ return fmt.Errorf("the topic=%s route info not found, it may
not exist", pc.topic)
+ }
+ pc.client.CheckClientInBroker()
+ pc.client.SendHeartbeatToAllBrokerWithLock()
+ pc.client.RebalanceImmediately()
+
return err
}
-func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector
MessageSelector, numbers int) (*primitive.PullResult, error) {
- mq := c.getNextQueueOf(topic)
- if mq == nil {
- return nil, fmt.Errorf("prepard to pull topic: %s, but no queue
is founded", topic)
+func (pc *defaultPullConsumer) Poll(ctx context.Context, timeout
time.Duration) (*ConsumeRequest, error) {
+ ctx, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ select {
+ case <-ctx.Done():
+ return nil, ErrNoNewMsg
+ case cr := <-pc.consumeRequestCache:
+ if len(cr.GetMsgList()) == 0 {
+ return nil, ErrNoNewMsg
+ }
+ return cr, nil
}
+}
- data := buildSubscriptionData(mq.Topic, selector)
- result, err := c.pull(context.Background(), mq, data,
c.nextOffsetOf(mq), numbers)
-
- if err != nil {
- return nil, err
+func (pc *defaultPullConsumer) ACK(ctx context.Context, cr *ConsumeRequest,
result ConsumeResult) {
+ if cr == nil {
+ return
+ }
+ pq := cr.processQueue
+ mq := cr.messageQueue
+ msgList := cr.msgList
+ if len(msgList) == 0 || pq == nil || mq == nil {
+ return
+ }
+RETRY:
+ if pq.IsDroppd() {
+ rlog.Info("defaultPullConsumer the message queue not be able to
consume, because it was dropped", map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq.String(),
+ rlog.LogKeyConsumerGroup: pc.consumerGroup,
+ })
+ return
}
- c.processPullResult(mq, result, data)
- return result, nil
-}
+ pc.resetRetryAndNamespace(msgList)
-func (c *defaultPullConsumer) getNextQueueOf(topic string)
*primitive.MessageQueue {
- queues, err :=
c.defaultConsumer.client.GetNameSrv().FetchSubscribeMessageQueues(topic)
- if err != nil && len(queues) > 0 {
- rlog.Error("get next mq error", map[string]interface{}{
- rlog.LogKeyTopic: topic,
- rlog.LogKeyUnderlayError: err.Error(),
- })
- return nil
+ msgCtx := &primitive.ConsumeMessageContext{
+ Properties: make(map[string]string),
+ ConsumerGroup: pc.consumerGroup,
+ MQ: mq,
+ Msgs: msgList,
}
- var index int64
- v, exist := queueCounterTable.Load(topic)
- if !exist {
- index = -1
- queueCounterTable.Store(topic, int64(0))
+ ctx = primitive.WithConsumerCtx(ctx, msgCtx)
+ ctx = primitive.WithMethod(ctx, primitive.ConsumerPull)
+ concurrentCtx := primitive.NewConsumeConcurrentlyContext()
+ concurrentCtx.MQ = *mq
+ ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
+
+ if result == ConsumeSuccess {
+ msgCtx.Properties[primitive.PropCtxType] =
string(primitive.SuccessReturn)
+ } else {
+ msgCtx.Properties[primitive.PropCtxType] =
string(primitive.FailedReturn)
+ }
+
+ if !pq.IsDroppd() {
+ msgBackFailed := make([]*primitive.MessageExt, 0)
+ msgBackSucceed := make([]*primitive.MessageExt, 0)
+ if result == ConsumeSuccess {
+ pc.stat.increaseConsumeOKTPS(pc.consumerGroup,
mq.Topic, len(msgList))
+ msgBackSucceed = msgList
+ } else {
+ pc.stat.increaseConsumeFailedTPS(pc.consumerGroup,
mq.Topic, len(msgList))
+ if pc.model == BroadCasting {
+ for i := 0; i < len(msgList); i++ {
+ rlog.Warning("defaultPullConsumer
BROADCASTING, the message consume failed, drop it", map[string]interface{}{
+ "message": msgList[i],
+ })
+ }
+ } else {
+ for i := 0; i < len(msgList); i++ {
+ msg := msgList[i]
+ if pc.sendMessageBack(mq.BrokerName,
msg, concurrentCtx.DelayLevelWhenNextConsume) {
+ msgBackSucceed =
append(msgBackSucceed, msg)
+ } else {
+ msg.ReconsumeTimes += 1
+ msgBackFailed =
append(msgBackFailed, msg)
+ }
+ }
+ }
+ }
+
+ offset := pq.removeMessage(msgBackSucceed...)
+
+ if offset >= 0 && !pq.IsDroppd() {
+ pc.storage.update(mq, int64(offset), true)
+ }
+ if len(msgBackFailed) > 0 {
+ msgList = msgBackFailed
+ time.Sleep(5 * time.Second)
+ goto RETRY
+ }
} else {
- index = v.(int64)
+ rlog.Warning("defaultPullConsumer processQueue is dropped
without process consume result.", map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq,
+ "message": msgList,
+ })
}
- return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
}
-// SubscribeWithChan ack manually
-func (c *defaultPullConsumer) SubscribeWithChan(topic, selector
MessageSelector) (chan *primitive.Message, error) {
- return nil, nil
+// resetRetryAndNamespace modify retry message.
+func (pc *defaultPullConsumer) resetRetryAndNamespace(msgList
[]*primitive.MessageExt) {
+ groupTopic := internal.RetryGroupTopicPrefix + pc.consumerGroup
+ beginTime := time.Now()
+ for idx := range msgList {
+ msg := msgList[idx]
+ retryTopic := msg.GetProperty(primitive.PropertyRetryTopic)
+ if retryTopic == "" && groupTopic == msg.Topic {
+ msg.Topic = retryTopic
+ }
+ msgList[idx].WithProperty(primitive.PropertyConsumeStartTime,
strconv.FormatInt(
+ beginTime.UnixNano()/int64(time.Millisecond), 10))
+ }
}
-// SubscribeWithFunc ack automatic
-func (c *defaultPullConsumer) SubscribeWithFunc(topic, selector
MessageSelector,
- f func(msg *primitive.Message) ConsumeResult) error {
- return nil
+func (pc *defaultPullConsumer) Pull(ctx context.Context, numbers int)
(*primitive.PullResult, error) {
+ mq := pc.getNextQueueOf(pc.topic)
+ if mq == nil {
+ return nil, fmt.Errorf("prepare to pull topic: %s, but no queue
is founded", pc.topic)
+ }
+
+ data := buildSubscriptionData(mq.Topic, pc.selector)
+ nextOffset, err := pc.nextOffsetOf(mq)
+ if err != nil {
+ return nil, err
+ }
+
+ result, err := pc.pull(context.Background(), mq, data, nextOffset,
numbers)
+ if err != nil {
+ return nil, err
+ }
+
+ pc.processPullResult(mq, result, data)
+ return result, nil
}
-func (c *defaultPullConsumer) ACK(msg *primitive.Message, result
ConsumeResult) {
+func (pc *defaultPullConsumer) getNextQueueOf(topic string)
*primitive.MessageQueue {
+ var queues []*primitive.MessageQueue
+ var err error
+ if len(pc.allocateQueues) == 0 {
+ topic = utils.WrapNamespace(pc.option.Namespace, topic)
+ queues, err =
pc.defaultConsumer.client.GetNameSrv().FetchSubscribeMessageQueues(topic)
+ if err != nil {
+ rlog.Error("get next mq error", map[string]interface{}{
+ rlog.LogKeyTopic: topic,
+ rlog.LogKeyUnderlayError: err.Error(),
+ })
+ return nil
+ }
+
+ if len(queues) == 0 {
+ rlog.Warning("defaultPullConsumer.getNextQueueOf len is
0", map[string]interface{}{
+ rlog.LogKeyTopic: topic,
+ })
+ return nil
+ }
+ } else {
+ queues = pc.allocateQueues
+ }
+ index := int(atomic.LoadInt64(&pc.nextQueueSequence)) % len(queues)
+ atomic.AddInt64(&pc.nextQueueSequence, 1)
+
+ nextQueue := queues[index]
+ rlog.Info("defaultPullConsumer.getNextQueueOf", map[string]interface{}{
+ rlog.LogKeyTopic: topic,
+ rlog.LogKeyConsumerGroup: pc.consumerGroup,
+ rlog.LogKeyMessageQueue: queues,
+ rlog.LogKeyAllocateMessageQueue: nextQueue.String(),
+ })
+ return nextQueue
}
-func (dc *defaultConsumer) checkPull(ctx context.Context, mq
*primitive.MessageQueue, offset int64, numbers int) error {
- err := dc.makeSureStateOK()
+func (pc *defaultPullConsumer) checkPull(mq *primitive.MessageQueue, offset
int64, numbers int) error {
+ err := pc.makeSureStateOK()
if err != nil {
return err
}
@@ -192,67 +380,415 @@ func (dc *defaultConsumer) checkPull(ctx
context.Context, mq *primitive.MessageQ
// TODO: add timeout limit
// TODO: add hook
-func (c *defaultPullConsumer) pull(ctx context.Context, mq
*primitive.MessageQueue, data *internal.SubscriptionData,
+func (pc *defaultPullConsumer) pull(ctx context.Context, mq
*primitive.MessageQueue, data *internal.SubscriptionData,
offset int64, numbers int) (*primitive.PullResult, error) {
- if err := c.checkPull(ctx, mq, offset, numbers); err != nil {
+ mq.Topic = utils.WrapNamespace(pc.option.Namespace, mq.Topic)
+ pc.consumerGroup = utils.WrapNamespace(pc.option.Namespace,
pc.consumerGroup)
+
+ if err := pc.checkPull(mq, offset, numbers); err != nil {
return nil, err
}
- c.subscriptionAutomatically(mq.Topic)
+ pc.subscriptionAutomatically(mq.Topic)
sysFlag := buildSysFlag(false, true, true, false)
- pullResp, err := c.pullInner(ctx, mq, data, offset, numbers, sysFlag, 0)
+ pullResp, err := pc.pullInner(ctx, mq, data, offset, numbers, sysFlag,
0)
if err != nil {
return nil, err
}
- c.processPullResult(mq, pullResp, data)
+ pc.processPullResult(mq, pullResp, data)
return pullResp, err
}
-func (c *defaultPullConsumer) makeSureStateOK() error {
- if atomic.LoadInt32(&c.state) != int32(internal.StateRunning) {
- return fmt.Errorf("the consumer state is [%d], not running",
c.state)
- }
- return nil
-}
-
-func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue)
int64 {
- result, _ := c.computePullFromWhereWithException(queue)
- return result
+func (pc *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue)
(int64, error) {
+ return pc.computePullFromWhereWithException(queue)
}
// PullFrom pull messages of queue from the offset to offset + numbers
-func (c *defaultPullConsumer) PullFrom(ctx context.Context, queue
*primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult,
error) {
- if err := c.checkPull(ctx, queue, offset, numbers); err != nil {
+func (pc *defaultPullConsumer) PullFrom(ctx context.Context, queue
*primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult,
error) {
+ if err := pc.checkPull(queue, offset, numbers); err != nil {
return nil, err
}
- selector := MessageSelector{}
- data := buildSubscriptionData(queue.Topic, selector)
+ data := buildSubscriptionData(queue.Topic, pc.selector)
- return c.pull(ctx, queue, data, offset, numbers)
+ return pc.pull(ctx, queue, data, offset, numbers)
}
-// updateOffset update offset of queue in mem
-func (c *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue,
offset int64) error {
- return c.updateOffset(queue, offset)
+// UpdateOffset updateOffset update offset of queue in mem
+func (pc *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue,
offset int64) error {
+ return pc.updateOffset(queue, offset)
}
// PersistOffset persist all offset in mem.
-func (c *defaultPullConsumer) PersistOffset(ctx context.Context) error {
- return c.persistConsumerOffset()
+func (pc *defaultPullConsumer) PersistOffset(ctx context.Context, topic
string) error {
+ return pc.persistConsumerOffset()
}
// CurrentOffset return the current offset of queue in mem.
-func (c *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue)
(int64, error) {
- v := c.queryOffset(queue)
+func (pc *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue)
(int64, error) {
+ v := pc.queryOffset(queue)
return v, nil
}
// Shutdown close defaultConsumer, refuse new request.
-func (c *defaultPullConsumer) Shutdown() error {
- return c.defaultConsumer.shutdown()
+func (pc *defaultPullConsumer) Shutdown() error {
+ var err error
+ pc.closeOnce.Do(func() {
+ close(pc.done)
+
+ pc.client.UnregisterConsumer(pc.consumerGroup)
+ err = pc.defaultConsumer.shutdown()
+ })
+
+ return err
+}
+
+func (pc *defaultPullConsumer) PersistConsumerOffset() error {
+ return pc.defaultConsumer.persistConsumerOffset()
+}
+
+func (pc *defaultPullConsumer) UpdateTopicSubscribeInfo(topic string, mqs
[]*primitive.MessageQueue) {
+ pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs)
+}
+
+func (pc *defaultPullConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
+ return pc.defaultConsumer.isSubscribeTopicNeedUpdate(topic)
+}
+
+func (pc *defaultPullConsumer) SubscriptionDataList()
[]*internal.SubscriptionData {
+ return pc.defaultConsumer.SubscriptionDataList()
+}
+
+func (pc *defaultPullConsumer) IsUnitMode() bool {
+ return pc.unitMode
+}
+
+func (pc *defaultPullConsumer) GetcType() string {
+ return string(pc.cType)
+}
+
+func (pc *defaultPullConsumer) GetModel() string {
+ return pc.model.String()
+}
+
+func (pc *defaultPullConsumer) GetWhere() string {
+ switch pc.fromWhere {
+ case ConsumeFromLastOffset:
+ return "CONSUME_FROM_LAST_OFFSET"
+ case ConsumeFromFirstOffset:
+ return "CONSUME_FROM_FIRST_OFFSET"
+ case ConsumeFromTimestamp:
+ return "CONSUME_FROM_TIMESTAMP"
+ default:
+ return "UNKOWN"
+ }
+
+}
+
+func (pc *defaultPullConsumer) Rebalance() {
+ pc.defaultConsumer.doBalance()
+}
+
+func (pc *defaultPullConsumer) RebalanceIfNotPaused() {
+ pc.defaultConsumer.doBalanceIfNotPaused()
+}
+
+func (pc *defaultPullConsumer) GetConsumerRunningInfo(stack bool)
*internal.ConsumerRunningInfo {
+ info := internal.NewConsumerRunningInfo()
+
+ pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
+ topic := key.(string)
+ info.SubscriptionData[value.(*internal.SubscriptionData)] = true
+ status := internal.ConsumeStatus{
+ PullRT: pc.stat.getPullRT(pc.consumerGroup,
topic).avgpt,
+ PullTPS: pc.stat.getPullTPS(pc.consumerGroup,
topic).tps,
+ ConsumeRT:
pc.stat.getConsumeRT(pc.consumerGroup, topic).avgpt,
+ ConsumeOKTPS:
pc.stat.getConsumeOKTPS(pc.consumerGroup, topic).tps,
+ ConsumeFailedTPS:
pc.stat.getConsumeFailedTPS(pc.consumerGroup, topic).tps,
+ ConsumeFailedMsgs:
pc.stat.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" +
pc.consumerGroup).sum,
+ }
+ info.StatusTable[topic] = status
+ return true
+ })
+
+ pc.processQueueTable.Range(func(key, value interface{}) bool {
+ mq := key.(primitive.MessageQueue)
+ pq := value.(*processQueue)
+ pInfo := pq.currentInfo()
+ pInfo.CommitOffset, _ = pc.storage.readWithException(&mq,
_ReadMemoryThenStore)
+ info.MQTable[mq] = pInfo
+ return true
+ })
+
+ if stack {
+ var buffer strings.Builder
+
+ err := pprof.Lookup("goroutine").WriteTo(&buffer, 2)
+ if err != nil {
+ rlog.Error("error when get stack ",
map[string]interface{}{
+ "error": err,
+ })
+ } else {
+ info.JStack = buffer.String()
+ }
+ }
+
+ nsAddr := ""
+ for _, value := range pc.client.GetNameSrv().AddrList() {
+ nsAddr += fmt.Sprintf("%s;", value)
+ }
+ info.Properties[internal.PropNameServerAddr] = nsAddr
+ info.Properties[internal.PropConsumeType] = string(pc.cType)
+ info.Properties[internal.PropConsumeOrderly] =
strconv.FormatBool(pc.consumeOrderly)
+ info.Properties[internal.PropThreadPoolCoreSize] = "-1"
+ info.Properties[internal.PropConsumerStartTimestamp] =
strconv.FormatInt(pc.consumerStartTimestamp, 10)
+ return info
+}
+
+func (pc *defaultPullConsumer) ConsumeMessageDirectly(msg
*primitive.MessageExt, brokerName string)
*internal.ConsumeMessageDirectlyResult {
+ return nil
+}
+
+func (pc *defaultPullConsumer) ResetOffset(topic string, table
map[primitive.MessageQueue]int64) {
+
+}
+
+func (pc *defaultPullConsumer) messageQueueChanged(topic string, mqAll,
mqDivided []*primitive.MessageQueue) {
+ var allocateQueues []*primitive.MessageQueue
+ pc.defaultConsumer.processQueueTable.Range(func(key, value interface{})
bool {
+ mq := key.(primitive.MessageQueue)
+ allocateQueues = append(allocateQueues, &mq)
+ return true
+ })
+ pc.allocateQueues = allocateQueues
+ pc.defaultConsumer.client.SendHeartbeatToAllBrokerWithLock()
+}
+
+func (pc *defaultPullConsumer) sendMessageBack(brokerName string, msg
*primitive.MessageExt, delayLevel int) bool {
+ var brokerAddr string
+ if len(brokerName) != 0 {
+ brokerAddr =
pc.defaultConsumer.client.GetNameSrv().FindBrokerAddrByName(brokerName)
+ } else {
+ brokerAddr = msg.StoreHost
+ }
+ _, err := pc.client.InvokeSync(context.Background(), brokerAddr,
pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
+ return err == nil
+}
+
+func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt,
delayLevel int) *remote.RemotingCommand {
+ req := &internal.ConsumerSendMsgBackRequestHeader{
+ Group: pc.consumerGroup,
+ OriginTopic: msg.Topic,
+ Offset: msg.CommitLogOffset,
+ DelayLevel: delayLevel,
+ OriginMsgId: msg.MsgId,
+ MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
+ }
+
+ return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req,
nil)
+}
+
+func (pc *defaultPullConsumer) getMaxReconsumeTimes() int32 {
+ if pc.option.MaxReconsumeTimes == -1 {
+ return 16
+ } else {
+ return pc.option.MaxReconsumeTimes
+ }
+}
+
+func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
+ rlog.Debug("defaultPullConsumer start a new Pull Message task for
PullRequest", map[string]interface{}{
+ rlog.LogKeyPullRequest: request.String(),
+ })
+ var sleepTime time.Duration
+ pq := request.pq
+ go primitive.WithRecover(func() {
+ for {
+ select {
+ case <-pc.done:
+ rlog.Info("defaultPullConsumer close
pullMessage.", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
pc.consumerGroup,
+ })
+ return
+ default:
+ pc.submitToConsume(request.pq, request.mq)
+ if request.pq.IsDroppd() {
+ rlog.Info("defaultPullConsumer quit
pullMessage for dropped queue.", map[string]interface{}{
+ rlog.LogKeyConsumerGroup:
pc.consumerGroup,
+ })
+ return
+ }
+ }
+ }
+ })
+ for {
+ NEXT:
+ select {
+ case <-pc.done:
+ rlog.Info("defaultPullConsumer close message handle.",
map[string]interface{}{
+ rlog.LogKeyConsumerGroup: pc.consumerGroup,
+ })
+ return
+ default:
+ }
+
+ if pq.IsDroppd() {
+ rlog.Debug("defaultPullConsumer the request was
dropped, so stop task", map[string]interface{}{
+ rlog.LogKeyPullRequest: request.String(),
+ })
+ return
+ }
+ if sleepTime > 0 {
+ rlog.Debug(fmt.Sprintf("defaultPullConsumer pull
MessageQueue: %d sleep %d ms for mq: %v", request.mq.QueueId,
sleepTime/time.Millisecond, request.mq), nil)
+ time.Sleep(sleepTime)
+ }
+ // reset time
+ sleepTime = pc.option.PullInterval
+ pq.lastPullTime.Store(time.Now())
+ err := pc.makeSureStateOK()
+ if err != nil {
+ rlog.Warning("defaultPullConsumer state error",
map[string]interface{}{
+ rlog.LogKeyUnderlayError: err.Error(),
+ })
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+
+ if pc.pause {
+ rlog.Debug(fmt.Sprintf("defaultPullConsumer [%s] of
[%s] was paused, execute pull request [%s] later",
+ pc.option.InstanceName, pc.consumerGroup,
request.String()), nil)
+ sleepTime = _PullDelayTimeWhenSuspend
+ goto NEXT
+ }
+
+ v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
+ if !exist {
+ rlog.Info("defaultPullConsumer find the consumer's
subscription failed", map[string]interface{}{
+ rlog.LogKeyPullRequest: request.String(),
+ })
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+ beginTime := time.Now()
+ sd := v.(*internal.SubscriptionData)
+
+ sysFlag := buildSysFlag(false, true, true, false)
+
+ pullRequest := &internal.PullMessageRequestHeader{
+ ConsumerGroup: pc.consumerGroup,
+ Topic: request.mq.Topic,
+ QueueId: int32(request.mq.QueueId),
+ QueueOffset: request.nextOffset,
+ MaxMsgNums: pc.option.PullBatchSize,
+ SysFlag: sysFlag,
+ CommitOffset: 0,
+ SubExpression: sd.SubString,
+ ExpressionType: string(TAG),
+ SuspendTimeoutMillis: 20 * time.Second,
+ }
+
+ brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)
+ if brokerResult == nil {
+ rlog.Warning("defaultPullConsumer no broker found for
mq", map[string]interface{}{
+ rlog.LogKeyPullRequest: request.mq.String(),
+ })
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+
+ if brokerResult.Slave {
+ pullRequest.SysFlag =
clearCommitOffsetFlag(pullRequest.SysFlag)
+ }
+
+ result, err := pc.client.PullMessage(context.Background(),
brokerResult.BrokerAddr, pullRequest)
+ if err != nil {
+ rlog.Warning("defaultPullConsumer pull message from
broker error", map[string]interface{}{
+ rlog.LogKeyBroker:
brokerResult.BrokerAddr,
+ rlog.LogKeyUnderlayError: err.Error(),
+ })
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+
+ if result.Status == primitive.PullBrokerTimeout {
+ rlog.Warning("defaultPullConsumer pull broker timeout",
map[string]interface{}{
+ rlog.LogKeyBroker: brokerResult.BrokerAddr,
+ })
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+
+ pc.processPullResult(request.mq, result, sd)
+
+ switch result.Status {
+ case primitive.PullFound:
+ rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found
messages.", request.mq.Topic, request.mq.QueueId), nil)
+ prevRequestOffset := request.nextOffset
+ request.nextOffset = result.NextBeginOffset
+
+ rt := time.Now().Sub(beginTime) / time.Millisecond
+ pc.stat.increasePullRT(pc.consumerGroup,
request.mq.Topic, int64(rt))
+
+ msgFounded := result.GetMessageExts()
+ firstMsgOffset := int64(math.MaxInt64)
+ if len(msgFounded) != 0 {
+ firstMsgOffset = msgFounded[0].QueueOffset
+ pc.stat.increasePullTPS(pc.consumerGroup,
request.mq.Topic, len(msgFounded))
+ pq.putMessage(msgFounded...)
+ }
+ if result.NextBeginOffset < prevRequestOffset ||
firstMsgOffset < prevRequestOffset {
+ rlog.Warning("[BUG] pull message result maybe
data wrong", map[string]interface{}{
+ "nextBeginOffset":
result.NextBeginOffset,
+ "firstMsgOffset": firstMsgOffset,
+ "prevRequestOffset": prevRequestOffset,
+ })
+ }
+ case primitive.PullNoNewMsg, primitive.PullNoMsgMatched:
+ request.nextOffset = result.NextBeginOffset
+ pc.correctTagsOffset(request)
+ case primitive.PullOffsetIllegal:
+ rlog.Warning("defaultPullConsumer the pull request
offset illegal", map[string]interface{}{
+ rlog.LogKeyPullRequest: request.String(),
+ "result": result.String(),
+ })
+ request.nextOffset = result.NextBeginOffset
+ pq.WithDropped(true)
+ time.Sleep(10 * time.Second)
+ pc.storage.update(request.mq, request.nextOffset, false)
+
pc.storage.persist([]*primitive.MessageQueue{request.mq})
+ pc.processQueueTable.Delete(*request.mq)
+ rlog.Warning(fmt.Sprintf("defaultPullConsumer fix the
pull request offset: %s", request.String()), nil)
+ default:
+ rlog.Warning(fmt.Sprintf("defaultPullConsumer unknown
pull status: %v", result.Status), nil)
+ sleepTime = _PullDelayTimeWhenError
+ }
+ }
+}
+
+func (pc *defaultPullConsumer) correctTagsOffset(pr *PullRequest) {
+ if pr.pq.cachedMsgCount.Load() <= 0 {
+ pc.storage.update(pr.mq, pr.nextOffset, true)
+ }
+}
+
+func (pc *defaultPullConsumer) consumeMessageCurrently(pq *processQueue, mq
*primitive.MessageQueue) {
+ msgList := pq.getMessages()
+ if msgList == nil {
+ return
+ }
+ cr := &ConsumeRequest{
+ messageQueue: mq,
+ processQueue: pq,
+ msgList: msgList,
+ }
+
+ select {
+ case <-pq.closeChan:
+ return
+ case pc.consumeRequestCache <- cr:
+ }
}
diff --git a/consumer/strategy.go b/consumer/strategy.go
index 4a07928..2af48de 100644
--- a/consumer/strategy.go
+++ b/consumer/strategy.go
@@ -20,7 +20,7 @@ package consumer
import (
"strings"
- "github.com/stathat/consistent"
+ "stathat.com/c/consistent"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
deleted file mode 100644
index 5b5819e..0000000
--- a/examples/consumer/pull/main.go
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package main
-
-import (
- "context"
- "fmt"
- "github.com/apache/rocketmq-client-go/v2/errors"
- "time"
-
- "github.com/apache/rocketmq-client-go/v2"
- "github.com/apache/rocketmq-client-go/v2/consumer"
- "github.com/apache/rocketmq-client-go/v2/primitive"
- "github.com/apache/rocketmq-client-go/v2/rlog"
-)
-
-func main() {
- c, err := rocketmq.NewPullConsumer(
- consumer.WithGroupName("testGroup"),
-
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
- )
- if err != nil {
- rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err),
nil)
- }
- err = c.Start()
- if err != nil {
- rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err),
nil)
- }
-
- ctx := context.Background()
- queue := primitive.MessageQueue{
- Topic: "TopicTest",
- BrokerName: "", // replace with your broker name. otherwise,
pull will failed.
- QueueId: 0,
- }
-
- offset := int64(0)
- for {
- resp, err := c.PullFrom(ctx, queue, offset, 10)
- if err != nil {
- if err == errors.ErrRequestTimeout {
- fmt.Printf("timeout \n")
- time.Sleep(1 * time.Second)
- continue
- }
- fmt.Printf("unexpectable err: %v \n", err)
- return
- }
- if resp.Status == primitive.PullFound {
- fmt.Printf("pull message success. nextOffset: %d \n",
resp.NextBeginOffset)
- for _, msg := range resp.GetMessageExts() {
- fmt.Printf("pull msg: %v \n", msg)
- }
- }
- offset = resp.NextBeginOffset
- }
-}
diff --git a/examples/consumer/pull/poll/main.go
b/examples/consumer/pull/poll/main.go
new file mode 100644
index 0000000..b6c783b
--- /dev/null
+++ b/examples/consumer/pull/poll/main.go
@@ -0,0 +1,102 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "log"
+ "net/http"
+ _ "net/http/pprof"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/v2"
+
+ "github.com/apache/rocketmq-client-go/v2/rlog"
+
+ "github.com/apache/rocketmq-client-go/v2/consumer"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+const (
+ nameSrvAddr = "http://127.0.0.1:9876"
+ accessKey = "rocketmq"
+ secretKey = "12345678"
+ topic = "test-topic"
+ consumerGroupName = "testPullGroup"
+ tag = "testPull"
+ namespace = "ns"
+)
+
+var pullConsumer rocketmq.PullConsumer
+var sleepTime = 1 * time.Second
+
+func main() {
+ go func() {
+ log.Println(http.ListenAndServe("localhost:6060", nil))
+ }()
+ rlog.SetLogLevel("info")
+ var nameSrv, err = primitive.NewNamesrvAddr(nameSrvAddr)
+ if err != nil {
+ log.Fatalf("NewNamesrvAddr err: %v", err)
+ }
+ pullConsumer, err = rocketmq.NewPullConsumer(
+ consumer.WithGroupName(consumerGroupName),
+ consumer.WithNameServer(nameSrv),
+ consumer.WithCredentials(primitive.Credentials{
+ AccessKey: accessKey,
+ SecretKey: secretKey,
+ }),
+ consumer.WithNamespace(namespace),
+ consumer.WithMaxReconsumeTimes(2),
+ )
+ if err != nil {
+ log.Fatalf("fail to new pullConsumer: %v", err)
+ }
+ selector := consumer.MessageSelector{
+ Type: consumer.TAG,
+ Expression: tag,
+ }
+ err = pullConsumer.Subscribe(topic, selector)
+ if err != nil {
+ log.Fatalf("fail to Subscribe: %v", err)
+ }
+ err = pullConsumer.Start()
+ if err != nil {
+ log.Fatalf("fail to Start: %v", err)
+ }
+
+ for {
+ poll()
+ }
+}
+
+func poll() {
+ cr, err := pullConsumer.Poll(context.TODO(), time.Second*5)
+ if consumer.IsNoNewMsgError(err) {
+ return
+ }
+ if err != nil {
+ log.Printf("[poll error] err=%v", err)
+ time.Sleep(sleepTime)
+ return
+ }
+ // todo LOGIC CODE HERE
+ log.Println("msgList: ", cr.GetMsgList())
+ // pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeRetryLater)
+ pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeSuccess)
+}
diff --git a/examples/consumer/pull/pull/main.go
b/examples/consumer/pull/pull/main.go
new file mode 100644
index 0000000..0b231a7
--- /dev/null
+++ b/examples/consumer/pull/pull/main.go
@@ -0,0 +1,143 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "log"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/v2"
+
+ "github.com/apache/rocketmq-client-go/v2/rlog"
+
+ "github.com/apache/rocketmq-client-go/v2/consumer"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+const (
+ nameSrvAddr = "http://127.0.0.1:9876"
+ accessKey = "rocketmq"
+ secretKey = "12345678"
+ topic = "test-topic"
+ consumerGroupName = "testPullGroup"
+ tag = "testPull"
+ namespace = "ns"
+)
+
+var pullConsumer rocketmq.PullConsumer
+var sleepTime = 1 * time.Second
+
+const refreshPersistOffsetDuration = time.Second * 5
+
+func main() {
+ rlog.SetLogLevel("info")
+ var nameSrv, err = primitive.NewNamesrvAddr(nameSrvAddr)
+ if err != nil {
+ log.Fatalf("NewNamesrvAddr err: %v", err)
+ }
+ pullConsumer, err = rocketmq.NewPullConsumer(
+ consumer.WithGroupName(consumerGroupName),
+ consumer.WithNameServer(nameSrv),
+ consumer.WithCredentials(primitive.Credentials{
+ AccessKey: accessKey,
+ SecretKey: secretKey,
+ }),
+ consumer.WithNamespace(namespace),
+ consumer.WithMaxReconsumeTimes(2),
+ )
+ if err != nil {
+ log.Fatalf("fail to new pullConsumer: %v", err)
+ }
+ selector := consumer.MessageSelector{
+ Type: consumer.TAG,
+ Expression: tag,
+ }
+ err = pullConsumer.Subscribe(topic, selector)
+ if err != nil {
+ log.Fatalf("fail to Subscribe: %v", err)
+ }
+ err = pullConsumer.Start()
+ if err != nil {
+ log.Fatalf("fail to Start: %v", err)
+ }
+
+ timer := time.NewTimer(refreshPersistOffsetDuration)
+ go func() {
+ for ; true; <-timer.C {
+ err = pullConsumer.PersistOffset(context.TODO(), topic)
+ if err != nil {
+ log.Printf("[pullConsumer.PersistOffset]
err=%v", err)
+ }
+ timer.Reset(refreshPersistOffsetDuration)
+ }
+ }()
+
+ for i := 0; i <= 4; i++ {
+ go func() {
+ for {
+ pull()
+ }
+ }()
+ }
+ // make current thread hold to see pull result. TODO should update here
+ time.Sleep(10000 * time.Second)
+}
+
+func pull() {
+ resp, err := pullConsumer.Pull(context.TODO(), 1)
+ if err != nil {
+ log.Printf("[pull error] err=%v", err)
+ time.Sleep(sleepTime)
+ return
+ }
+ switch resp.Status {
+ case primitive.PullFound:
+ log.Printf("[pull message successfully] MinOffset:%d,
MaxOffset:%d, nextOffset: %d, len:%d\n", resp.MinOffset, resp.MaxOffset,
resp.NextBeginOffset, len(resp.GetMessages()))
+ var queue *primitive.MessageQueue
+ if len(resp.GetMessages()) <= 0 {
+ return
+ }
+ for _, msg := range resp.GetMessageExts() {
+ // todo LOGIC CODE HERE
+ queue = msg.Queue
+ //log.Println(msg.Queue, msg.QueueOffset,
msg.GetKeys(), msg.MsgId, string(msg.Body))
+ log.Println(msg)
+ }
+ // update offset
+ err = pullConsumer.UpdateOffset(queue, resp.NextBeginOffset)
+ if err != nil {
+ log.Printf("[pullConsumer.UpdateOffset] err=%v", err)
+ }
+
+ case primitive.PullNoNewMsg, primitive.PullNoMsgMatched:
+ log.Printf("[no pull message] next = %d\n",
resp.NextBeginOffset)
+ time.Sleep(sleepTime)
+ return
+ case primitive.PullBrokerTimeout:
+ log.Printf("[pull broker timeout] next = %d\n",
resp.NextBeginOffset)
+
+ time.Sleep(sleepTime)
+ return
+ case primitive.PullOffsetIllegal:
+ log.Printf("[pull offset illegal] next = %d\n",
resp.NextBeginOffset)
+ return
+ default:
+ log.Printf("[pull error] next = %d\n", resp.NextBeginOffset)
+ }
+}
diff --git a/examples/consumer/pull/pull_from/main.go
b/examples/consumer/pull/pull_from/main.go
new file mode 100644
index 0000000..62445f5
--- /dev/null
+++ b/examples/consumer/pull/pull_from/main.go
@@ -0,0 +1,177 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/v2"
+
+ "github.com/apache/rocketmq-client-go/v2/admin"
+ "github.com/apache/rocketmq-client-go/v2/consumer"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+ "github.com/go-redis/redis/v8"
+)
+
+const (
+ nameSrvAddr = "http://127.0.0.1:9876"
+ accessKey = "rocketmq"
+ secretKey = "12345678"
+ topic = "test-topic"
+ consumerGroupName = "testPullFromGroup"
+ tag = "testPullFrom"
+ namespace = "ns"
+)
+
+var ctx = context.Background()
+var client = redis.NewClient(&redis.Options{
+ Addr: "127.0.0.1:6379",
+ Password: "", // no password set
+ DB: 0, // use default DB
+})
+
+func main() {
+ var nameSrv, err = primitive.NewNamesrvAddr(nameSrvAddr)
+ if err != nil {
+ log.Fatalf("NewNamesrvAddr err: %v", err)
+ }
+ pullConsumer, err := rocketmq.NewPullConsumer(
+ consumer.WithGroupName(consumerGroupName),
+ consumer.WithNameServer(nameSrv),
+ consumer.WithCredentials(primitive.Credentials{
+ AccessKey: accessKey,
+ SecretKey: secretKey,
+ }),
+ consumer.WithNamespace(namespace),
+ )
+ if err != nil {
+ log.Fatalf("fail to new pullConsumer: %v", err)
+ }
+
+ selector := consumer.MessageSelector{
+ Type: consumer.TAG,
+ Expression: tag,
+ }
+ err = pullConsumer.Subscribe(topic, selector)
+ if err != nil {
+ log.Fatalf("Subscribe error: %s\n", err)
+ }
+ err = pullConsumer.Start()
+ if err != nil {
+ log.Fatalf("fail to new pullConsumer: %v", err)
+ }
+
+ nameSrvAddr := []string{nameSrvAddr}
+ mqAdmin, err := admin.NewAdmin(
+
admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)),
+ admin.WithCredentials(primitive.Credentials{
+ AccessKey: accessKey,
+ SecretKey: secretKey,
+ }),
+ admin.WithNamespace(namespace),
+ )
+ if err != nil {
+ log.Fatalf("fail to new admin: %v", err)
+ }
+
+ messageQueues, err := mqAdmin.FetchPublishMessageQueues(ctx, topic)
+ if err != nil {
+ log.Fatalf("fetch messahe queue error: %s\n", err)
+ }
+ var sleepTime = 1 * time.Second
+
+ for _, queue := range messageQueues {
+ offset := getOffset(client, consumerGroupName, topic,
queue.QueueId)
+ if offset < 0 { // default consume from offset 0
+ offset = 0
+ }
+ go func(queue *primitive.MessageQueue) {
+ for {
+ resp, err := pullConsumer.PullFrom(ctx, queue,
offset, 1) // pull one message per time for make sure easily ACK
+ if err != nil {
+ log.Printf("[pull error] topic=%s,
queue id=%d, broker=%s, offset=%d\n", topic, queue.QueueId, queue.BrokerName,
offset)
+ time.Sleep(sleepTime)
+ continue
+ }
+ switch resp.Status {
+ case primitive.PullFound:
+ log.Printf("[pull message successfully]
queue id = %d, nextOffset: %d \n", queue.QueueId, resp.NextBeginOffset)
+ for _, msg := range
resp.GetMessageExts() {
+ // todo LOGIC CODE HERE
+
+ log.Println(msg.GetKeys(),
msg.MsgId, string(msg.Body))
+
+ // save offset to redis
+ err = ackOffset(client,
consumerGroupName, topic, queue.QueueId, resp.NextBeginOffset)
+ if err != nil {
+ //todo ack error logic
+ }
+
+ // set offset for next pull
+ offset = resp.NextBeginOffset
+ }
+ case primitive.PullNoNewMsg,
primitive.PullNoMsgMatched:
+ log.Printf("[no pull message] topic=%s,
queue id=%d, broker=%s, offset=%d, next = %d\n", queue.Topic, queue.QueueId,
queue.BrokerName, offset, resp.NextBeginOffset)
+ time.Sleep(sleepTime)
+ offset = resp.NextBeginOffset
+ continue
+ case primitive.PullBrokerTimeout:
+ log.Printf("[pull broker timeout]
topic=%s, queue id=%d, broker=%s, offset=%d, next = %d\n", queue.Topic,
queue.QueueId, queue.BrokerName, offset, resp.NextBeginOffset)
+
+ time.Sleep(sleepTime)
+ continue
+ case primitive.PullOffsetIllegal:
+ log.Printf("[pull offset illegal]
topic=%s, queue id=%d, broker=%s, offset=%d, next = %d\n", queue.Topic,
queue.QueueId, queue.BrokerName, offset, resp.NextBeginOffset)
+ offset = resp.NextBeginOffset
+ continue
+ default:
+ log.Printf("[pull error] topic=%s,
queue id=%d, broker=%s, offset=%d, next = %d\n", queue.Topic, queue.QueueId,
queue.BrokerName, offset, resp.NextBeginOffset)
+ }
+ }
+ }(queue)
+ }
+ // make current thread hold to see pull result. TODO should update here
+ time.Sleep(10000 * time.Second)
+}
+
+func ackOffset(redis *redis.Client, consumerGroupName string, topic string,
queueId int, consumedOffset int64) error {
+ var key = fmt.Sprintf("rmq-%s-%s-%d", consumerGroupName, topic, queueId)
+ err := redis.Set(ctx, key, consumedOffset, 0).Err()
+ if err != nil {
+ log.Printf("set redis error, key:%s, %v\n", key, err)
+ return err
+ }
+ return nil
+}
+
+func getOffset(redisCli *redis.Client, consumerGroupName string, topic string,
queueId int) int64 {
+ var key = fmt.Sprintf("rmq-%s-%s-%d", consumerGroupName, topic, queueId)
+ offset, err := redisCli.Get(ctx, key).Int64()
+ if err == redis.Nil {
+ return 0
+ } else if err != nil {
+ log.Printf("get redis error, key:%s, %v\n", key, err)
+ //todo Your own logic. like get from db.
+ return 0
+ } else {
+ return offset
+ }
+}
diff --git a/go.mod b/go.mod
index 74d5eb6..09acdad 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,7 @@ go 1.13
require (
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/emirpasic/gods v1.12.0
+ github.com/go-redis/redis/v8 v8.11.5
github.com/golang/mock v1.3.1
github.com/json-iterator/go v1.1.12
github.com/patrickmn/go-cache v2.1.0+incompatible
@@ -12,13 +13,11 @@ require (
github.com/satori/go.uuid v1.2.0
github.com/sirupsen/logrus v1.4.0
github.com/smartystreets/goconvey v1.6.4
- github.com/stathat/consistent v1.0.0
- github.com/stretchr/testify v1.3.0
+ github.com/stretchr/testify v1.5.1
github.com/tidwall/gjson v1.13.0
go.uber.org/atomic v1.5.1
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
- gopkg.in/yaml.v2 v2.4.0 // indirect
- stathat.com/c/consistent v1.0.0 // indirect
+ stathat.com/c/consistent v1.0.0
)
diff --git a/go.sum b/go.sum
index 543d1ab..1ec2f17 100644
--- a/go.sum
+++ b/go.sum
@@ -1,15 +1,46 @@
github.com/BurntSushi/toml v1.1.0
h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod
h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
+github.com/cespare/xxhash/v2 v2.1.2
h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
+github.com/cespare/xxhash/v2 v2.1.2/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/chzyer/logex v1.1.10/go.mod
h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
+github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod
h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
+github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod
h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod
h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/emirpasic/gods v1.12.0
h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod
h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
+github.com/fsnotify/fsnotify v1.4.7/go.mod
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9
h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
+github.com/fsnotify/fsnotify v1.4.9/go.mod
h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/go-redis/redis/v8 v8.11.5
h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
+github.com/go-redis/redis/v8 v8.11.5/go.mod
h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod
h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod
h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
+github.com/golang/protobuf v1.2.0/go.mod
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod
h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod
h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod
h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod
h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod
h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2/go.mod
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2
h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/google/go-cmp v0.3.0/go.mod
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod
h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1
h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod
h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/hpcloud/tail v1.0.0/go.mod
h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
+github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod
h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/json-iterator/go v1.1.12
h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod
h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jtolds/gls v4.20.0+incompatible
h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
@@ -25,6 +56,21 @@ github.com/modern-go/concurrent
v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2
h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod
h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/nxadm/tail v1.4.4/go.mod
h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
+github.com/nxadm/tail v1.4.8/go.mod
h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
+github.com/onsi/ginkgo v1.6.0/go.mod
h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod
h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.16.4/go.mod
h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
+github.com/onsi/ginkgo v1.16.5/go.mod
h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
+github.com/onsi/ginkgo/v2 v2.0.0
h1:CcuG/HvWNkkaqCUpJifQY8z7qEMBJya6aLPx6ftGyjQ=
+github.com/onsi/ginkgo/v2 v2.0.0/go.mod
h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
+github.com/onsi/gomega v1.7.1/go.mod
h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1/go.mod
h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/onsi/gomega v1.17.0/go.mod
h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
+github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
+github.com/onsi/gomega v1.18.1/go.mod
h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/patrickmn/go-cache v2.1.0+incompatible
h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod
h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
@@ -39,45 +85,95 @@ github.com/smartystreets/assertions
v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod
h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4
h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod
h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
-github.com/stathat/consistent v1.0.0
h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
-github.com/stathat/consistent v1.0.0/go.mod
h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-github.com/stretchr/testify v1.3.0
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.5.1
h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
+github.com/stretchr/testify v1.5.1/go.mod
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/tidwall/gjson v1.13.0
h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M=
github.com/tidwall/gjson v1.13.0/go.mod
h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod
h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0
h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod
h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
+github.com/yuin/goldmark v1.2.1/go.mod
h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM=
go.uber.org/atomic v1.5.1/go.mod
h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
-golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod
h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de
h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod
h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod
h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210428140749-89ef3d95e781
h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
+golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod
h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f
h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod
h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod
h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod
h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
-golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c
h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e
h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod
h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod
h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod
h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.23.0/go.mod
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0
h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod
h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod
h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/natefinch/lumberjack.v2 v2.0.0
h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod
h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod
h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
diff --git a/internal/utils/namespace.go b/internal/utils/namespace.go
new file mode 100644
index 0000000..5e98706
--- /dev/null
+++ b/internal/utils/namespace.go
@@ -0,0 +1,24 @@
+package utils
+
+import "strings"
+
+const namespaceSeparator = "%"
+
+func WrapNamespace(namespace, resourceWithOutNamespace string) string {
+ if IsEmpty(namespace) || IsEmpty(resourceWithOutNamespace) {
+ return resourceWithOutNamespace
+ }
+
+ if isAlreadyWithNamespace(resourceWithOutNamespace, namespace) {
+ return resourceWithOutNamespace
+ }
+
+ return namespace + namespaceSeparator + resourceWithOutNamespace
+}
+
+func isAlreadyWithNamespace(resource, namespace string) bool {
+ if IsEmpty(namespace) || IsEmpty(resource) {
+ return false
+ }
+ return strings.Contains(resource, namespace+namespaceSeparator)
+}
diff --git a/internal/utils/string.go b/internal/utils/string.go
index f347397..a73b400 100644
--- a/internal/utils/string.go
+++ b/internal/utils/string.go
@@ -17,6 +17,8 @@ limitations under the License.
package utils
+import "strings"
+
// HashString hashes a string to a unique hashcode.
func HashString(s string) int {
val := []byte(s)
@@ -28,3 +30,7 @@ func HashString(s string) int {
return int(h)
}
+
+func IsEmpty(s string) bool {
+ return strings.TrimSpace(s) == ""
+}
diff --git a/primitive/result.go b/primitive/result.go
index 20d393a..18c4352 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -123,6 +123,8 @@ func (result *PullResult) String() string {
func toMessages(messageExts []*MessageExt) []*Message {
msgs := make([]*Message, 0)
-
+ for _, messageExt := range messageExts {
+ msgs = append(msgs, &messageExt.Message)
+ }
return msgs
}
diff --git a/rlog/log.go b/rlog/log.go
index b387ed7..e4070b7 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -28,16 +28,17 @@ import (
)
const (
- LogKeyProducerGroup = "producerGroup"
- LogKeyConsumerGroup = "consumerGroup"
- LogKeyTopic = "topic"
- LogKeyMessageQueue = "MessageQueue"
- LogKeyUnderlayError = "underlayError"
- LogKeyBroker = "broker"
- LogKeyValueChangedFrom = "changedFrom"
- LogKeyValueChangedTo = "changeTo"
- LogKeyPullRequest = "PullRequest"
- LogKeyTimeStamp = "timestamp"
+ LogKeyProducerGroup = "producerGroup"
+ LogKeyConsumerGroup = "consumerGroup"
+ LogKeyTopic = "topic"
+ LogKeyMessageQueue = "MessageQueue"
+ LogKeyAllocateMessageQueue = "AllocateMessageQueue"
+ LogKeyUnderlayError = "underlayError"
+ LogKeyBroker = "broker"
+ LogKeyValueChangedFrom = "changedFrom"
+ LogKeyValueChangedTo = "changeTo"
+ LogKeyPullRequest = "PullRequest"
+ LogKeyTimeStamp = "timestamp"
)
type Logger interface {