This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new dd3dbcc  [ISSUE #1178] feat:pull consumer support Assign subscription 
type
dd3dbcc is described below

commit dd3dbccb08087eaaa2e366066b97bbdc572b8911
Author: muyun.cyt <921148...@qq.com>
AuthorDate: Mon Dec 2 11:33:21 2024 +0800

    [ISSUE #1178] feat:pull consumer support Assign subscription type
    
    1.Pull Consumer Support Assign Subscription Type.
    2.add several apis for Assign Sub Type: 
SeekOffset/Assign/OffsetForTimestamp/GetTopicRouteInfo
---
 api.go                                     |  11 +++
 consumer/consumer.go                       |   8 ++
 consumer/mock_offset_store.go              |   4 +-
 consumer/pull_consumer.go                  | 125 ++++++++++++++++++++++++++++-
 consumer/statistics_test.go                |   4 +-
 errors/errors.go                           |   3 +
 examples/consumer/pull/poll_assign/main.go | 115 ++++++++++++++++++++++++++
 examples/consumer/tls/main.go              | 118 +++++++++++++--------------
 examples/producer/rpc/async/main.go        |   1 -
 examples/producer/tls/main.go              | 124 ++++++++++++++--------------
 10 files changed, 384 insertions(+), 129 deletions(-)

diff --git a/api.go b/api.go
index b0a203e..c4b5c08 100644
--- a/api.go
+++ b/api.go
@@ -82,6 +82,8 @@ func NewPushConsumer(opts ...consumer.Option) (PushConsumer, 
error) {
 type PullConsumer interface {
        // Start the PullConsumer for consuming message
        Start() error
+       // GetTopicRouteInfo get topic route info
+       GetTopicRouteInfo(topic string) ([]*primitive.MessageQueue, error)
 
        // Subscribe a topic for consuming
        Subscribe(topic string, selector consumer.MessageSelector) error
@@ -89,6 +91,9 @@ type PullConsumer interface {
        // Unsubscribe a topic
        Unsubscribe(topic string) error
 
+       // Assign assign message queue to consumer
+       Assign(topic string, mqs []*primitive.MessageQueue) error
+
        // Shutdown the PullConsumer, all offset of MessageQueue will be commit 
to broker before process exit
        Shutdown() error
 
@@ -104,6 +109,12 @@ type PullConsumer interface {
        // PullFrom pull messages of queue from the offset to offset + numbers
        PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset 
int64, numbers int) (*primitive.PullResult, error)
 
+       // SeekOffset seek offset for specific queue
+       SeekOffset(queue *primitive.MessageQueue, offset int64)
+
+       // OffsetForTimestamp get offset of specific queue with timestamp
+       OffsetForTimestamp(queue *primitive.MessageQueue, timestamp int64) 
(int64, error)
+
        // UpdateOffset updateOffset update offset of queue in mem
        UpdateOffset(queue *primitive.MessageQueue, offset int64) error
 
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 98eb17b..9e9bedb 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -316,6 +316,14 @@ func (dc *defaultConsumer) shutdown() error {
        return nil
 }
 
+func (dc *defaultConsumer) isRunning() bool {
+       return atomic.LoadInt32(&dc.state) == int32(internal.StateRunning)
+}
+
+func (dc *defaultConsumer) isStopped() bool {
+       return atomic.LoadInt32(&dc.state) == int32(internal.StateShutdown)
+}
+
 func (dc *defaultConsumer) persistConsumerOffset() error {
        err := dc.makeSureStateOK()
        if err != nil {
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
index 1d7c3bd..c122927 100644
--- a/consumer/mock_offset_store.go
+++ b/consumer/mock_offset_store.go
@@ -117,7 +117,7 @@ func (mr *MockOffsetStoreMockRecorder) update(mq, offset, 
increaseOnly interface
 }
 
 // getMQOffsetMap mocks base method
-func (m *MockOffsetStore) getMQOffsetMap(topic string) 
map[primitive.MessageQueue]int64  {
+func (m *MockOffsetStore) getMQOffsetMap(topic string) 
map[primitive.MessageQueue]int64 {
        m.ctrl.T.Helper()
        ret := m.ctrl.Call(m, "getMQOffsetMap", topic)
        ret0, _ := ret[0].(map[primitive.MessageQueue]int64)
@@ -125,7 +125,7 @@ func (m *MockOffsetStore) getMQOffsetMap(topic string) 
map[primitive.MessageQueu
 }
 
 // getMQOffsetMap indicates an expected call of getMQOffsetMap
-func (mr *MockOffsetStoreMockRecorder) getMQOffsetMap(topic string) 
*gomock.Call{
+func (mr *MockOffsetStoreMockRecorder) getMQOffsetMap(topic string) 
*gomock.Call {
        mr.mock.ctrl.T.Helper()
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getMQOffsetMap", 
reflect.TypeOf((*MockOffsetStore)(nil).getMQOffsetMap), topic)
 }
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index c66ffb7..3258779 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -64,6 +64,14 @@ func (cr *ConsumeRequest) GetPQ() *processQueue {
        return cr.processQueue
 }
 
+type SubscriptionType int
+
+const (
+       None SubscriptionType = iota
+       Subscribe
+       Assign
+)
+
 type defaultPullConsumer struct {
        *defaultConsumer
 
@@ -71,9 +79,11 @@ type defaultPullConsumer struct {
        selector          MessageSelector
        GroupName         string
        Model             MessageModel
+       SubType           SubscriptionType
        UnitMode          bool
        nextQueueSequence int64
        allocateQueues    []*primitive.MessageQueue
+       mq2seekOffset     sync.Map // 
key:primitive.MessageQueue,value:seekOffset
 
        done                chan struct{}
        closeOnce           sync.Once
@@ -116,6 +126,7 @@ func NewPullConsumer(options ...Option) 
(*defaultPullConsumer, error) {
                defaultConsumer:     dc,
                done:                make(chan struct{}, 1),
                consumeRequestCache: make(chan *ConsumeRequest, 4),
+               GroupName:           dc.option.GroupName,
        }
        dc.mqChanged = c.messageQueueChanged
        c.submitToConsume = c.consumeMessageConcurrently
@@ -123,11 +134,32 @@ func NewPullConsumer(options ...Option) 
(*defaultPullConsumer, error) {
        return c, nil
 }
 
+func (pc *defaultPullConsumer) GetTopicRouteInfo(topic string) 
([]*primitive.MessageQueue, error) {
+       topicWithNs := utils.WrapNamespace(pc.option.Namespace, topic)
+       value, exist := 
pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs)
+       if exist {
+               return value.([]*primitive.MessageQueue), nil
+       }
+       pc.client.UpdateTopicRouteInfo()
+       value, exist = 
pc.defaultConsumer.topicSubscribeInfoTable.Load(topicWithNs)
+       if !exist {
+               return nil, errors2.ErrRouteNotFound
+       }
+       return value.([]*primitive.MessageQueue), nil
+}
+
 func (pc *defaultPullConsumer) Subscribe(topic string, selector 
MessageSelector) error {
        if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
                atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
                return errors2.ErrStartTopic
        }
+       if pc.SubType == Assign {
+               return errors2.ErrSubscriptionType
+       }
+
+       if pc.SubType == None {
+               pc.SubType = Subscribe
+       }
        topic = utils.WrapNamespace(pc.option.Namespace, topic)
 
        data := buildSubscriptionData(topic, selector)
@@ -139,11 +171,53 @@ func (pc *defaultPullConsumer) Subscribe(topic string, 
selector MessageSelector)
 }
 
 func (pc *defaultPullConsumer) Unsubscribe(topic string) error {
+       if pc.SubType == Assign {
+               return errors2.ErrSubscriptionType
+       }
        topic = utils.WrapNamespace(pc.option.Namespace, topic)
        pc.subscriptionDataTable.Delete(topic)
        return nil
 }
 
+func (pc *defaultPullConsumer) Assign(topic string, mqs 
[]*primitive.MessageQueue) error {
+       if pc.SubType == Subscribe {
+               return errors2.ErrSubscriptionType
+       }
+       if pc.SubType == None {
+               pc.SubType = Assign
+       }
+       topic = utils.WrapNamespace(pc.option.Namespace, topic)
+       data := buildSubscriptionData(topic, MessageSelector{TAG, _SubAll})
+       pc.topic = topic
+       pc.subscriptionDataTable.Store(topic, data)
+       oldQueues := pc.allocateQueues
+       pc.allocateQueues = mqs
+       rlog.Info("pull consumer assign new mqs", map[string]interface{}{
+               "topic":  topic,
+               "group":  pc.GroupName,
+               "oldMqs": oldQueues,
+               "newMqs": mqs,
+       })
+       if pc.isRunning() {
+               pc.Rebalance()
+       }
+       return nil
+}
+
+func (pc *defaultPullConsumer) nextPullOffset(mq *primitive.MessageQueue, 
originOffset int64) int64 {
+       if pc.SubType != Assign {
+               return originOffset
+       }
+       value, exist := pc.mq2seekOffset.LoadAndDelete(mq)
+       if !exist {
+               return originOffset
+       } else {
+               nextOffset := value.(int64)
+               _ = pc.updateOffset(mq, nextOffset)
+               return nextOffset
+       }
+}
+
 func (pc *defaultPullConsumer) Start() error {
        var err error
        pc.once.Do(func() {
@@ -546,11 +620,34 @@ func (pc *defaultPullConsumer) GetWhere() string {
 }
 
 func (pc *defaultPullConsumer) Rebalance() {
-       pc.defaultConsumer.doBalance()
+       switch pc.SubType {
+       case Assign:
+               pc.RebalanceViaTopic()
+               break
+       case Subscribe:
+               pc.defaultConsumer.doBalance()
+               break
+       }
 }
 
 func (pc *defaultPullConsumer) RebalanceIfNotPaused() {
-       pc.defaultConsumer.doBalanceIfNotPaused()
+       switch pc.SubType {
+       case Assign:
+               pc.RebalanceViaTopic()
+               break
+       case Subscribe:
+               pc.defaultConsumer.doBalanceIfNotPaused()
+               break
+       }
+}
+
+func (pc *defaultPullConsumer) RebalanceViaTopic() {
+       changed := pc.defaultConsumer.updateProcessQueueTable(pc.topic, 
pc.allocateQueues)
+       if changed {
+               rlog.Info("PullConsumer rebalance result changed ", 
map[string]interface{}{
+                       rlog.LogKeyAllocateMessageQueue: pc.allocateQueues,
+               })
+       }
 }
 
 func (pc *defaultPullConsumer) GetConsumerRunningInfo(stack bool) 
*internal.ConsumerRunningInfo {
@@ -613,7 +710,23 @@ func (pc *defaultPullConsumer) ResetOffset(topic string, 
table map[primitive.Mes
 
 }
 
+func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset 
int64) {
+       pc.mq2seekOffset.Store(mq, offset)
+       rlog.Info("pull consumer seek offset", map[string]interface{}{
+               "mq":     mq,
+               "offset": offset,
+       })
+}
+
+func (pc *defaultPullConsumer) OffsetForTimestamp(mq *primitive.MessageQueue, 
timestamp int64) (int64, error) {
+       return pc.searchOffsetByTimestamp(mq, timestamp)
+}
+
 func (pc *defaultPullConsumer) messageQueueChanged(topic string, mqAll, 
mqDivided []*primitive.MessageQueue) {
+       if pc.SubType == Assign {
+               return
+       }
+
        var allocateQueues []*primitive.MessageQueue
        pc.defaultConsumer.processQueueTable.Range(func(key, value interface{}) 
bool {
                mq := key.(primitive.MessageQueue)
@@ -734,6 +847,8 @@ func (pc *defaultPullConsumer) pullMessage(request 
*PullRequest) {
                        sleepTime = _PullDelayTimeWhenError
                        goto NEXT
                }
+
+               nextOffset := pc.nextPullOffset(request.mq, request.nextOffset)
                beginTime := time.Now()
                sd := v.(*internal.SubscriptionData)
 
@@ -743,7 +858,7 @@ func (pc *defaultPullConsumer) pullMessage(request 
*PullRequest) {
                        ConsumerGroup:        pc.consumerGroup,
                        Topic:                request.mq.Topic,
                        QueueId:              int32(request.mq.QueueId),
-                       QueueOffset:          request.nextOffset,
+                       QueueOffset:          nextOffset,
                        MaxMsgNums:           pc.option.PullBatchSize.Load(),
                        SysFlag:              sysFlag,
                        CommitOffset:         0,
@@ -880,5 +995,9 @@ func (pc *defaultPullConsumer) validate() error {
                return fmt.Errorf("consumerGroup can't equal [%s], please 
specify another one", internal.DefaultConsumerGroup)
        }
 
+       if pc.SubType == None {
+               return errors2.ErrBlankSubType
+       }
+
        return nil
 }
diff --git a/consumer/statistics_test.go b/consumer/statistics_test.go
index 930f0a3..7f29dd3 100644
--- a/consumer/statistics_test.go
+++ b/consumer/statistics_test.go
@@ -217,9 +217,9 @@ func TestNewStatsManager(t *testing.T) {
        stats := NewStatsManager()
 
        st := time.Now()
-       for  {
+       for {
                stats.increasePullTPS("rocketmq", "default", 1)
-               time.Sleep(500*time.Millisecond)
+               time.Sleep(500 * time.Millisecond)
                if time.Now().Sub(st) > 5*time.Minute {
                        break
                }
diff --git a/errors/errors.go b/errors/errors.go
index 2899506..03d0fb5 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -34,11 +34,14 @@ var (
        ErrCreated           = errors.New("consumer group has been created")
        ErrBrokerNotFound    = errors.New("broker can not found")
        ErrStartTopic        = errors.New("cannot subscribe topic since client 
either failed to start or has been shutdown.")
+       ErrSubscriptionType  = errors.New("subscribe type is not matched")
+       ErrBlankSubType      = errors.New("subscribe type should not be blank")
        ErrResponse          = errors.New("response error")
        ErrCompressLevel     = errors.New("unsupported compress level")
        ErrUnknownIP         = errors.New("unknown IP address")
        ErrService           = errors.New("service close is not running, please 
check")
        ErrTopicNotExist     = errors.New("topic not exist")
+       ErrRouteNotFound     = errors.New("topic route not found")
        ErrNotExisted        = errors.New("not existed")
        ErrNoNameserver      = errors.New("nameServerAddrs can't be empty.")
        ErrMultiIP           = errors.New("multiple IP addr does not support")
diff --git a/examples/consumer/pull/poll_assign/main.go 
b/examples/consumer/pull/poll_assign/main.go
new file mode 100644
index 0000000..838400a
--- /dev/null
+++ b/examples/consumer/pull/poll_assign/main.go
@@ -0,0 +1,115 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "log"
+       _ "net/http/pprof"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/v2"
+
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+
+       "github.com/apache/rocketmq-client-go/v2/consumer"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+const (
+       nameSrvAddr       = "http://127.0.0.1:9876";
+       accessKey         = "rocketmq"
+       secretKey         = "12345678"
+       topic             = "test-topic"
+       consumerGroupName = "testPullGroup"
+       tag               = "testPull"
+       namespace         = "ns"
+)
+
+var pullConsumer rocketmq.PullConsumer
+var sleepTime = 1 * time.Second
+
+func main() {
+       rlog.SetLogLevel("info")
+       var nameSrv, err = primitive.NewNamesrvAddr(nameSrvAddr)
+       if err != nil {
+               log.Fatalf("NewNamesrvAddr err: %v", err)
+       }
+       pullConsumer, err = rocketmq.NewPullConsumer(
+               consumer.WithGroupName(consumerGroupName),
+               consumer.WithNameServer(nameSrv),
+               consumer.WithNamespace(namespace),
+               consumer.WithMaxReconsumeTimes(2),
+       )
+       if err != nil {
+               log.Fatalf("fail to new pullConsumer: %v", err)
+       }
+
+       // assign nil firstly to help consumer start up
+       err = pullConsumer.Assign(topic, nil)
+       if err != nil {
+               log.Fatalf("fail to Assign: %v", err)
+       }
+       err = pullConsumer.Start()
+       if err != nil {
+               log.Fatalf("fail to Start: %v", err)
+       }
+
+       mqs, err := pullConsumer.GetTopicRouteInfo(topic)
+       if err != nil {
+               log.Fatalf("fail to GetTopicRouteInfo: %v", err)
+       }
+
+       for _, mq := range mqs {
+               offset, err := pullConsumer.OffsetForTimestamp(mq, 
time.Now().UnixMilli()-60*10)
+               if err != nil {
+                       log.Fatalf("fail to get offset for timestamp: %v", err)
+               } else {
+                       pullConsumer.SeekOffset(mq, offset)
+               }
+       }
+
+       err = pullConsumer.Assign(topic, mqs)
+       if err != nil {
+               log.Fatalf("fail to Assign: %v", err)
+       }
+
+       for {
+               poll()
+       }
+}
+
+func poll() {
+       cr, err := pullConsumer.Poll(context.TODO(), time.Second*5)
+       if consumer.IsNoNewMsgError(err) {
+               log.Println("no new msg")
+               return
+       }
+       if err != nil {
+               log.Printf("[poll error] err=%v", err)
+               time.Sleep(sleepTime)
+               return
+       }
+
+       // todo LOGIC CODE HERE
+       log.Println("msgList: ", cr.GetMsgList())
+       log.Println("messageQueue: ", cr.GetMQ())
+       log.Println("processQueue: ", cr.GetPQ())
+       // pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeRetryLater)
+       pullConsumer.ACK(context.TODO(), cr, consumer.ConsumeSuccess)
+}
diff --git a/examples/consumer/tls/main.go b/examples/consumer/tls/main.go
index 248c837..fab61ab 100644
--- a/examples/consumer/tls/main.go
+++ b/examples/consumer/tls/main.go
@@ -1,59 +1,59 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package main
-
-import (
-       "context"
-       "fmt"
-       "os"
-       "time"
-
-       "github.com/apache/rocketmq-client-go/v2"
-       "github.com/apache/rocketmq-client-go/v2/consumer"
-       "github.com/apache/rocketmq-client-go/v2/primitive"
-)
-
-func main() {
-       c, _ := rocketmq.NewPushConsumer(
-               consumer.WithGroupName("testGroup"),
-               
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
-               consumer.WithTls(true),
-       )
-       err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx 
context.Context,
-               msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
-               for i := range msgs {
-                       fmt.Printf("subscribe callback: %v \n", msgs[i])
-               }
-
-               return consumer.ConsumeSuccess, nil
-       })
-       if err != nil {
-               fmt.Println(err.Error())
-       }
-       // Note: start after subscribe
-       err = c.Start()
-       if err != nil {
-               fmt.Println(err.Error())
-               os.Exit(-1)
-       }
-       time.Sleep(time.Hour)
-       err = c.Shutdown()
-       if err != nil {
-               fmt.Printf("shutdown Consumer error: %s", err.Error())
-       }
-}
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/v2"
+       "github.com/apache/rocketmq-client-go/v2/consumer"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+       c, _ := rocketmq.NewPushConsumer(
+               consumer.WithGroupName("testGroup"),
+               
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+               consumer.WithTls(true),
+       )
+       err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx 
context.Context,
+               msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+               for i := range msgs {
+                       fmt.Printf("subscribe callback: %v \n", msgs[i])
+               }
+
+               return consumer.ConsumeSuccess, nil
+       })
+       if err != nil {
+               fmt.Println(err.Error())
+       }
+       // Note: start after subscribe
+       err = c.Start()
+       if err != nil {
+               fmt.Println(err.Error())
+               os.Exit(-1)
+       }
+       time.Sleep(time.Hour)
+       err = c.Shutdown()
+       if err != nil {
+               fmt.Printf("shutdown Consumer error: %s", err.Error())
+       }
+}
diff --git a/examples/producer/rpc/async/main.go 
b/examples/producer/rpc/async/main.go
index c6676d3..c0ce57b 100644
--- a/examples/producer/rpc/async/main.go
+++ b/examples/producer/rpc/async/main.go
@@ -15,7 +15,6 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 */
 
-
 package main
 
 import (
diff --git a/examples/producer/tls/main.go b/examples/producer/tls/main.go
index c926c05..ddaf165 100644
--- a/examples/producer/tls/main.go
+++ b/examples/producer/tls/main.go
@@ -1,62 +1,62 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package main
-
-import (
-       "context"
-       "fmt"
-       "os"
-       "strconv"
-
-       "github.com/apache/rocketmq-client-go/v2"
-       "github.com/apache/rocketmq-client-go/v2/primitive"
-       "github.com/apache/rocketmq-client-go/v2/producer"
-)
-
-// Package main implements a simple producer to send message.
-func main() {
-       p, _ := rocketmq.NewProducer(
-               
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
-               producer.WithRetry(2),
-               producer.WithTls(true),
-       )
-       err := p.Start()
-       if err != nil {
-               fmt.Printf("start producer error: %s", err.Error())
-               os.Exit(1)
-       }
-       topic := "test"
-
-       for i := 0; i < 10; i++ {
-               msg := &primitive.Message{
-                       Topic: topic,
-                       Body:  []byte("Hello RocketMQ Go Client! " + 
strconv.Itoa(i)),
-               }
-               res, err := p.SendSync(context.Background(), msg)
-
-               if err != nil {
-                       fmt.Printf("send message error: %s\n", err)
-               } else {
-                       fmt.Printf("send message success: result=%s\n", 
res.String())
-               }
-       }
-       err = p.Shutdown()
-       if err != nil {
-               fmt.Printf("shutdown producer error: %s", err.Error())
-       }
-}
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "strconv"
+
+       "github.com/apache/rocketmq-client-go/v2"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/producer"
+)
+
+// Package main implements a simple producer to send message.
+func main() {
+       p, _ := rocketmq.NewProducer(
+               
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+               producer.WithRetry(2),
+               producer.WithTls(true),
+       )
+       err := p.Start()
+       if err != nil {
+               fmt.Printf("start producer error: %s", err.Error())
+               os.Exit(1)
+       }
+       topic := "test"
+
+       for i := 0; i < 10; i++ {
+               msg := &primitive.Message{
+                       Topic: topic,
+                       Body:  []byte("Hello RocketMQ Go Client! " + 
strconv.Itoa(i)),
+               }
+               res, err := p.SendSync(context.Background(), msg)
+
+               if err != nil {
+                       fmt.Printf("send message error: %s\n", err)
+               } else {
+                       fmt.Printf("send message success: result=%s\n", 
res.String())
+               }
+       }
+       err = p.Shutdown()
+       if err != nil {
+               fmt.Printf("shutdown producer error: %s", err.Error())
+       }
+}

Reply via email to