This is an automated email from the ASF dual-hosted git repository.
cserwen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a15c777 [ISSUE #1056] fix: When the consumer and producer have the
same topic, the consumer fails to update the topic.
a15c777 is described below
commit a15c7771e84b00a33ba6f054d8c31d85654e2dac
Author: lvxiao <[email protected]>
AuthorDate: Mon Jun 19 17:51:11 2023 +0800
[ISSUE #1056] fix: When the consumer and producer have the same topic, the
consumer fails to update the topic.
---
internal/client.go | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git a/internal/client.go b/internal/client.go
index d479b0c..1321da9 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -686,19 +686,17 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
}
func (c *rmqClient) UpdateTopicRouteInfo() {
+ allTopics := make(map[string]bool, 0)
publishTopicSet := make(map[string]bool, 0)
c.producerMap.Range(func(key, value interface{}) bool {
producer := value.(InnerProducer)
list := producer.PublishTopicList()
for idx := range list {
publishTopicSet[list[idx]] = true
+ allTopics[list[idx]] = true
}
return true
})
- for topic := range publishTopicSet {
- data, changed, _ := c.GetNameSrv().UpdateTopicRouteInfo(topic)
- c.UpdatePublishInfo(topic, data, changed)
- }
subscribedTopicSet := make(map[string]bool, 0)
c.consumerMap.Range(func(key, value interface{}) bool {
@@ -706,13 +704,22 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
list := consumer.SubscriptionDataList()
for idx := range list {
subscribedTopicSet[list[idx].Topic] = true
+ allTopics[list[idx].Topic] = true
}
return true
})
- for topic := range subscribedTopicSet {
+ for topic := range allTopics {
data, changed, _ := c.GetNameSrv().UpdateTopicRouteInfo(topic)
- c.updateSubscribeInfo(topic, data, changed)
+
+ if publishTopicSet[topic] {
+ c.UpdatePublishInfo(topic, data, changed)
+ }
+
+ if subscribedTopicSet[topic] {
+ c.updateSubscribeInfo(topic, data, changed)
+ }
+
}
}