majintao opened a new issue, #1129: URL: https://github.com/apache/rocketmq-client-go/issues/1129
`package rocket_mq import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/skywalking-go/toolkit/trace" ) type ProcFunc func(context.Context, []byte) error type ConsumerProcessor struct { opts []consumer.Option selector consumer.MessageSelector topic string procFunc ProcFunc consumer rocketmq.PushConsumer } func NewConsumerProcessor(topic string, procFunc ProcFunc, opts ...consumer.Option) *ConsumerProcessor { return &ConsumerProcessor{ opts: opts, topic: topic, procFunc: procFunc, } } func (cp *ConsumerProcessor) Start() error { // 启动消费 c, err := rocketmq.NewPushConsumer(cp.opts...) if err != nil { return err } cp.consumer = c c.Subscribe(cp.topic, cp.selector, func(ctx context.Context, messages ...*primitive.MessageExt) ( consumer.ConsumeResult, error) { // 处理消息 for _, message := range messages { err := cp.process(ctx, message) if err != nil { fmt.Printf("messageId:%s", message.MsgId) return consumer.ConsumeRetryLater, nil } } return consumer.ConsumeSuccess, nil }) err = c.Start() if err != nil { return err } return nil } func (cp *ConsumerProcessor) process(ctx context.Context, message *primitive.MessageExt) error { // 添加链路追踪支持 _, err := trace.CreateLocalSpan("") if err != nil { } defer trace.StopSpan() err = cp.procFunc(ctx, message.Body) if err != nil { return err } return nil } func (cp *ConsumerProcessor) Stop() { // 关闭消费 cp.consumer.Shutdown() } ` ` topic := "test_topic" rocket_mq.NewConsumerProcessor(topic, func(c context.Context, msg []byte) error { xflog.Infof("msg:%v", string(msg)) return errors.New("111") }, consumer.WithRetry(-1), consumer.WithAutoCommit(false), consumer.WithGroupName("abc"), consumer.WithNameServer([]string{"10.34.22.164:9876", "10.34.22.167:9876"})).Start() ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org