GitHub user Ssummer520 edited a discussion: 关于消费者负载均衡的问题 (顺序消息负载机制)

https://rocketmq.apache.org/zh/docs/featureBehavior/08consumerloadbalance

顺序消息负载机制
![WeChate8bd4c47823fcce3d83fcc9f5ea8f14c](https://github.com/user-attachments/assets/6d911e7d-b2f7-40eb-87ff-fae8100ade14)

当我启动两个client来消费顺序消息
Topic := "FIFOTopic"
        os.Setenv("mq.consoleAppender.enabled", "true")
        golang.ResetLogger()
        // new simpleConsumer instance
        simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
                Endpoint:      Endpoint,
                Credentials:   &credentials.SessionCredentials{},
                ConsumerGroup: "groupFIFO",
        },

这块配置都一致
唯一区别 一个在ack前进行time.sleep模拟 第一条消息没有消费完阻塞,没有返回ack。按照顺序消息负载机制第一个consumer
应该消费成功之后 第二个consumer再轮训到第二个consumer.结果是阻塞的consumer未返回ack 
我的另一个consumer消费了剩下全部的顺序消息.第一个consumer重试机制 消息重试发送到第二个consumer
        for {
                fmt.Println("start recevie message")
                mvs, err := simpleConsumer.Receive(context.TODO(), 
maxMessageNum, invisibleDuration)
                if err != nil {
                        fmt.Println(err)
                        continue
                }
                for _, mv := range mvs {
                       time.Sleep(time.Second *5)
                        err := simpleConsumer.Ack(context.TODO(), mv)
                        if err != nil {
                                return
                        }
                        fmt.Println(string(mv.GetBody()) + "  " + 
time.Now().Format(time.StampMilli))
                }
        }

生产者
func SendFIFO() {
        topic := "FIFOTopic"
        producer, err := golang.NewProducer(&golang.Config{
                Endpoint:    Endpoint,
                Credentials: &credentials.SessionCredentials{},
        },
                golang.WithTopics(topic),
        )
        if err != nil {
                log.Fatal(err)
        }
        // start producer
        err = producer.Start()
        if err != nil {
                log.Fatal(err)
        }
        // gracefule stop producer
        defer producer.GracefulStop()
        for i := 0; i < 10; i++ {
                msg := &golang.Message{
                        Topic: topic,
                        Body:  []byte("this is a message groupFIFO: " + 
strconv.Itoa(i) + "  " + time.Now().Format(time.DateTime)),
                }
                // set keys and tag
                msg.SetKeys("a", "b")
                msg.SetTag("ab")
                msg.SetMessageGroup("fifo")
                // send message in sync
                resp, err := producer.Send(context.TODO(), msg)
                if err != nil {
                        log.Fatal(err)
                }
                for i := 0; i < len(resp); i++ {
                        fmt.Printf("%#v\n", resp[i])
                }
                // wait a moment


                time.Sleep(time.Second / 10)
        }
}

结果:
![image](https://github.com/user-attachments/assets/e0b7fbc1-cc88-4cb5-8c8e-08efc24aae03)


![image](https://github.com/user-attachments/assets/05a9b684-2e1f-4d97-8de9-384bcc169faa)

> 

GitHub link: https://github.com/apache/rocketmq-clients/discussions/812

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org

Reply via email to