This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 613ba90  [ISSUE #744] check multiple topics in one batch
613ba90 is described below

commit 613ba90f0c1fc04dccd8b84c966a4e9eb3ccda86
Author: Oreoreoreo <[email protected]>
AuthorDate: Wed Dec 1 08:37:26 2021 +0800

    [ISSUE #744] check multiple topics in one batch
    
    Co-authored-by: maoruilei3120 <[email protected]>
---
 errors/errors.go          |  1 +
 producer/producer.go      |  8 ++++++++
 producer/producer_test.go | 34 ++++++++++++++++++++++++++++++++++
 3 files changed, 43 insertions(+)

diff --git a/errors/errors.go b/errors/errors.go
index 793fcda..43b49ca 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -47,4 +47,5 @@ var (
        ErrMessageEmpty      = errors.New("message is nil")
        ErrNotRunning        = errors.New("producer not started")
        ErrPullConsumer      = errors.New("pull consumer has not supported")
+       ErrMultipleTopics    = errors.New("the topic of the messages in one 
batch should be the same")
 )
diff --git a/producer/producer.go b/producer/producer.go
index 2ab2445..226eedb 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -103,6 +103,14 @@ func (p *defaultProducer) checkMsg(msgs 
...*primitive.Message) error {
        if len(msgs[0].Topic) == 0 {
                return errors2.ErrTopicEmpty
        }
+
+       topic := msgs[0].Topic
+       for _, msg := range msgs {
+               if msg.Topic != topic {
+                       return errors2.ErrMultipleTopics
+               }
+       }
+
        return nil
 }
 
diff --git a/producer/producer_test.go b/producer/producer_test.go
index b6ec84d..a7c15c1 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -310,3 +310,37 @@ func TestSyncWithNamespace(t *testing.T) {
        assert.Equal(t, expectedResp, resp)
        assert.Equal(t, namespaceTopic, msg.Topic)
 }
+
+func TestBatchSendDifferentTopics(t *testing.T) {
+       p, _ := NewDefaultProducer(
+               
WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
+               WithRetry(2),
+               WithQueueSelector(NewManualQueueSelector()),
+       )
+
+       ctrl := gomock.NewController(t)
+       defer ctrl.Finish()
+       client := internal.NewMockRMQClient(ctrl)
+       p.client = client
+
+       client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+       client.EXPECT().Start().Return()
+       err := p.Start()
+       assert.Nil(t, err)
+
+       ctx := context.Background()
+       msgToA := &primitive.Message{
+               Topic: "topic-A",
+               Body:  []byte("this is a message body"),
+       }
+
+       msgToB := &primitive.Message{
+               Topic: "topic-B",
+               Body:  []byte("this is a message body"),
+       }
+
+       resp, err := p.SendSync(ctx, []*primitive.Message{msgToA, msgToB}...)
+       assert.Nil(t, resp)
+       assert.NotNil(t, err)
+       assert.Equal(t, err, errors.ErrMultipleTopics)
+}

Reply via email to