majintao opened a new issue, #1129:
URL: https://github.com/apache/rocketmq-client-go/issues/1129

   `package rocket_mq
   
   import (
        "context"
        "fmt"
   
        "github.com/apache/rocketmq-client-go/v2"
        "github.com/apache/rocketmq-client-go/v2/consumer"
        "github.com/apache/rocketmq-client-go/v2/primitive"
        "github.com/apache/skywalking-go/toolkit/trace"
   )
   
   type ProcFunc func(context.Context, []byte) error
   
   type ConsumerProcessor struct {
        opts     []consumer.Option
        selector consumer.MessageSelector
        topic    string
        procFunc ProcFunc
        consumer rocketmq.PushConsumer
   }
   
   func NewConsumerProcessor(topic string, procFunc ProcFunc, opts 
...consumer.Option) *ConsumerProcessor {
        return &ConsumerProcessor{
                opts:     opts,
                topic:    topic,
                procFunc: procFunc,
        }
   }
   
   func (cp *ConsumerProcessor) Start() error {
        // 启动消费
        c, err := rocketmq.NewPushConsumer(cp.opts...)
        if err != nil {
                return err
        }
   
        cp.consumer = c
   
        c.Subscribe(cp.topic, cp.selector, func(ctx context.Context, messages 
...*primitive.MessageExt) (
                consumer.ConsumeResult, error) {
                // 处理消息
                for _, message := range messages {
                        err := cp.process(ctx, message)
                        if err != nil {
                                fmt.Printf("messageId:%s", message.MsgId)
                                return consumer.ConsumeRetryLater, nil
                        }
                }
                return consumer.ConsumeSuccess, nil
        })
   
        err = c.Start()
        if err != nil {
                return err
        }
   
        return nil
   }
   
   func (cp *ConsumerProcessor) process(ctx context.Context, message 
*primitive.MessageExt) error {
        // 添加链路追踪支持
        _, err := trace.CreateLocalSpan("")
        if err != nil {
        }
        defer trace.StopSpan()
   
        err = cp.procFunc(ctx, message.Body)
        if err != nil {
                return err
        }
   
        return nil
   }
   
   func (cp *ConsumerProcessor) Stop() {
        // 关闭消费
        cp.consumer.Shutdown()
   }
   
   `
   
   
   `
   topic := "test_topic"
        rocket_mq.NewConsumerProcessor(topic, func(c context.Context, msg 
[]byte) error {
                xflog.Infof("msg:%v", string(msg))
                return errors.New("111")
        }, consumer.WithRetry(-1), consumer.WithAutoCommit(false), 
consumer.WithGroupName("abc"),
                consumer.WithNameServer([]string{"10.34.22.164:9876", 
"10.34.22.167:9876"})).Start()
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to