IguoChan opened a new issue #699:
URL: https://github.com/apache/rocketmq-client-go/issues/699


   **Describe the bug**
   In HA cluster mode, when the master broker node is down, my client push the 
message successfully but can not subscribe the message, and the error log `pull 
message from broker error`, and the map in log contains the down node ip.
   
   **Reason**
   I check my code like below:
   ``` golang
   func init() {
           defaultProducer, err = rocketmq.NewProducer(
                producer.WithNameServer(addrs),
                producer.WithCredentials(primitive.Credentials{
                        AccessKey: cfg.AccessKey,
                        SecretKey: cfg.SecretKey,
                }),
        )
           ...
           defaultPushConsumer, err = rocketmq.NewPushConsumer(
                consumer.WithNameServer(addrs),
                consumer.WithCredentials(primitive.Credentials{
                        AccessKey: cfg.AccessKey,
                        SecretKey: cfg.SecretKey,
                }),
                consumer.WithNamespace(cfg.InstanceID),
                consumer.WithGroupName(groupID),
                consumer.WithConsumerModel(consumer.Clustering),
        )
           ...
   }
   ```
   I found when I called `NewProducer`, it will new a object of `srvs` and a 
object of producer.client
   ``` golang
   func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
        ...
        srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
        if err != nil {
                return nil, errors.Wrap(err, "new Namesrv failed.")
        }
        ...
        producer.client = 
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)
           ...
   
        return producer, nil
   }
   ```
   When I called `NewPushConsumer` then, when the func called 
`internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)`, it will 
reuse the `producer.client` as the `dc.client` because the `client.ClientID()` 
return the same clientID, so the `dc.client.namesrvs` is the namesrvs produced 
before, the `dc.namesrv` is produced now.
   ``` golang
   func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
        ...
        srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
        if err != nil {
                return nil, errors.Wrap(err, "new Namesrv failed.")
        }
        if !defaultOpts.Credentials.IsEmpty() {
                srvs.SetCredentials(defaultOpts.Credentials)
        }
        defaultOpts.Namesrv = srvs
   
        if defaultOpts.Namespace != "" {
                defaultOpts.GroupName = defaultOpts.Namespace + "%" + 
defaultOpts.GroupName
        }
   
        dc := &defaultConsumer{
                client:         
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
                consumerGroup:  defaultOpts.GroupName,
                cType:          _PushConsume,
                state:          int32(internal.StateCreateJust),
                prCh:           make(chan PullRequest, 4),
                model:          defaultOpts.ConsumerModel,
                consumeOrderly: defaultOpts.ConsumeOrderly,
                fromWhere:      defaultOpts.FromWhere,
                allocate:       defaultOpts.Strategy,
                option:         defaultOpts,
                namesrv:        srvs,
        }
   
        ...
   
        p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)
   
        return p, nil
   }
   ```
   So, when the master broker node is down, the function `cleanOfflineBroker` 
holder is namesvr produced before, it can not clean the `brokerAddressesMap` of 
the namesvr produced later.
   
   So, I change my code below, change the instanceName of producer and 
pushconsumer will create two client each time I called 
`internal.GetOrNewRocketMQClient`. I can avoid this bug.
   ``` golang
   func init() {
           defaultProducer, err = rocketmq.NewProducer(
                producer.WithNameServer(addrs),
                producer.WithCredentials(primitive.Credentials{
                        AccessKey: cfg.AccessKey,
                        SecretKey: cfg.SecretKey,
                }),
                   producer.WithInstanceName("aiot_priducer"), // add
        )
           ...
           defaultPushConsumer, err = rocketmq.NewPushConsumer(
                consumer.WithNameServer(addrs),
                consumer.WithCredentials(primitive.Credentials{
                        AccessKey: cfg.AccessKey,
                        SecretKey: cfg.SecretKey,
                }),
                consumer.WithNamespace(cfg.InstanceID),
                consumer.WithGroupName(groupID),
                consumer.WithConsumerModel(consumer.Clustering),
                   consumer.WithInstance("aiot_consumer"), // add
        )
           ...
   }
   ```
   
   My Question is why your code create only one client if we don't set 
instanceName and UnitName, but new a different namesvrs every time we called 
`NewPushConsumer` and `NewProducer`. The `client.namesrvs` will call the 
function `cleanOfflineBroker` to clean the brokers when some broker down, but 
it can't clean the later produced `namesrvs`'s `brokerAddressesMap`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to