IguoChan opened a new issue #699:
URL: https://github.com/apache/rocketmq-client-go/issues/699
**Describe the bug**
In HA cluster mode, when the master broker node is down, my client push the
message successfully but can not subscribe the message, and the error log `pull
message from broker error`, and the map in log contains the down node ip.
**Reason**
I check my code like below:
``` golang
func init() {
defaultProducer, err = rocketmq.NewProducer(
producer.WithNameServer(addrs),
producer.WithCredentials(primitive.Credentials{
AccessKey: cfg.AccessKey,
SecretKey: cfg.SecretKey,
}),
)
...
defaultPushConsumer, err = rocketmq.NewPushConsumer(
consumer.WithNameServer(addrs),
consumer.WithCredentials(primitive.Credentials{
AccessKey: cfg.AccessKey,
SecretKey: cfg.SecretKey,
}),
consumer.WithNamespace(cfg.InstanceID),
consumer.WithGroupName(groupID),
consumer.WithConsumerModel(consumer.Clustering),
)
...
}
```
I found when I called `NewProducer`, it will new a object of `srvs` and a
object of producer.client
``` golang
func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
...
srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
...
producer.client =
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)
...
return producer, nil
}
```
When I called `NewPushConsumer` then, when the func called
`internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)`, it will
reuse the `producer.client` as the `dc.client` because the `client.ClientID()`
return the same clientID, so the `dc.client.namesrvs` is the namesrvs produced
before, the `dc.namesrv` is produced now.
``` golang
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
...
srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
if !defaultOpts.Credentials.IsEmpty() {
srvs.SetCredentials(defaultOpts.Credentials)
}
defaultOpts.Namesrv = srvs
if defaultOpts.Namespace != "" {
defaultOpts.GroupName = defaultOpts.Namespace + "%" +
defaultOpts.GroupName
}
dc := &defaultConsumer{
client:
internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
cType: _PushConsume,
state: int32(internal.StateCreateJust),
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
consumeOrderly: defaultOpts.ConsumeOrderly,
fromWhere: defaultOpts.FromWhere,
allocate: defaultOpts.Strategy,
option: defaultOpts,
namesrv: srvs,
}
...
p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)
return p, nil
}
```
So, when the master broker node is down, the function `cleanOfflineBroker`
holder is namesvr produced before, it can not clean the `brokerAddressesMap` of
the namesvr produced later.
So, I change my code below, change the instanceName of producer and
pushconsumer will create two client each time I called
`internal.GetOrNewRocketMQClient`. I can avoid this bug.
``` golang
func init() {
defaultProducer, err = rocketmq.NewProducer(
producer.WithNameServer(addrs),
producer.WithCredentials(primitive.Credentials{
AccessKey: cfg.AccessKey,
SecretKey: cfg.SecretKey,
}),
producer.WithInstanceName("aiot_priducer"), // add
)
...
defaultPushConsumer, err = rocketmq.NewPushConsumer(
consumer.WithNameServer(addrs),
consumer.WithCredentials(primitive.Credentials{
AccessKey: cfg.AccessKey,
SecretKey: cfg.SecretKey,
}),
consumer.WithNamespace(cfg.InstanceID),
consumer.WithGroupName(groupID),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithInstance("aiot_consumer"), // add
)
...
}
```
My Question is why your code create only one client if we don't set
instanceName and UnitName, but new a different namesvrs every time we called
`NewPushConsumer` and `NewProducer`. The `client.namesrvs` will call the
function `cleanOfflineBroker` to clean the brokers when some broker down, but
it can't clean the later produced `namesrvs`'s `brokerAddressesMap`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]