This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit cfb9daa75f2a49d2f205d223de9bf7331c002620 Author: Aaron Ai <[email protected]> AuthorDate: Wed Nov 2 11:11:48 2022 +0800 Reserve index of load balancer when topic route is updated --- .../java/impl/consumer/SimpleConsumerImpl.java | 39 +++++++++++----------- .../impl/consumer/SubscriptionLoadBalancer.java | 10 +++++- .../client/java/impl/producer/ProducerImpl.java | 28 +++++++++------- .../java/impl/producer/PublishingLoadBalancer.java | 10 +++++- 4 files changed, 53 insertions(+), 34 deletions(-) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java index 1b565dd..310df59 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java @@ -27,7 +27,6 @@ import com.google.common.math.IntMath; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -68,7 +67,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer { private final AtomicInteger topicIndex; private final Map<String /* topic */, FilterExpression> subscriptionExpressions; - private final ConcurrentMap<String /* topic */, SubscriptionLoadBalancer> subTopicRouteDataResultCache; + private final ConcurrentMap<String /* topic */, SubscriptionLoadBalancer> subscriptionRouteDataCache; public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration, Map<String, FilterExpression> subscriptionExpressions) { @@ -82,7 +81,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer { this.topicIndex = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)); this.subscriptionExpressions = subscriptionExpressions; - this.subTopicRouteDataResultCache = new ConcurrentHashMap<>(); + this.subscriptionRouteDataCache = new ConcurrentHashMap<>(); } @Override @@ -191,7 +190,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer { } final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size())); final FilterExpression filterExpression = copy.get(topic); - final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionTopicRouteResult(topic); + final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionLoadBalancer(topic); final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> { final MessageQueueImpl mq = result.takeMessageQueue(); final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, @@ -298,25 +297,25 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer { return simpleSubscriptionSettings; } + private SubscriptionLoadBalancer updateSubscriptionLoadBalancer(String topic, TopicRouteData topicRouteData) { + SubscriptionLoadBalancer subscriptionLoadBalancer = subscriptionRouteDataCache.get(topic); + subscriptionLoadBalancer = null == subscriptionLoadBalancer ? new SubscriptionLoadBalancer(topicRouteData) : + subscriptionLoadBalancer.update(topicRouteData); + subscriptionRouteDataCache.put(topic, subscriptionLoadBalancer); + return subscriptionLoadBalancer; + } + + @Override public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) { - final SubscriptionLoadBalancer subscriptionLoadBalancer = - new SubscriptionLoadBalancer(topicRouteData); - subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer); + updateSubscriptionLoadBalancer(topic, topicRouteData); } - private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionTopicRouteResult(final String topic) { - SettableFuture<SubscriptionLoadBalancer> future0 = SettableFuture.create(); - final SubscriptionLoadBalancer result = subTopicRouteDataResultCache.get(topic); - if (null != result) { - future0.set(result); - return future0; + private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionLoadBalancer(final String topic) { + final SubscriptionLoadBalancer loadBalancer = subscriptionRouteDataCache.get(topic); + if (null != loadBalancer) { + return Futures.immediateFuture(loadBalancer); } - final ListenableFuture<TopicRouteData> future = getRouteData(topic); - return Futures.transform(future, topicRouteDataResult -> { - final SubscriptionLoadBalancer subscriptionLoadBalancer = - new SubscriptionLoadBalancer(topicRouteDataResult); - subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer); - return subscriptionLoadBalancer; - }, MoreExecutors.directExecutor()); + return Futures.transform(getRouteData(topic), topicRouteData -> updateSubscriptionLoadBalancer(topic, + topicRouteData), MoreExecutors.directExecutor()); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java index 012d441..109855d 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java @@ -41,7 +41,11 @@ public class SubscriptionLoadBalancer { private final ImmutableList<MessageQueueImpl> messageQueues; public SubscriptionLoadBalancer(TopicRouteData topicRouteData) { - this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)); + this(new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)), topicRouteData); + } + + private SubscriptionLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData) { + this.index = index; final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream() .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isReadable() && Utilities.MASTER_BROKER_ID == mq.getBroker().getId()) @@ -52,6 +56,10 @@ public class SubscriptionLoadBalancer { this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build(); } + SubscriptionLoadBalancer update(TopicRouteData topicRouteData) { + return new SubscriptionLoadBalancer(index, topicRouteData); + } + public MessageQueueImpl takeMessageQueue() { final int next = index.getAndIncrement(); return messageQueues.get(IntMath.mod(next, messageQueues.size())); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java index 307af8a..70cb6aa 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java @@ -398,7 +398,7 @@ class ProducerImpl extends ClientImpl implements Producer { this.topics.add(topic); // Get publishing topic route. - final ListenableFuture<PublishingLoadBalancer> routeFuture = getPublishingTopicRouteResult(topic); + final ListenableFuture<PublishingLoadBalancer> routeFuture = getPublishingLoadBalancer(topic); return Futures.transformAsync(routeFuture, result -> { // Prepare the candidate message queue(s) for retry-sending in advance. final List<MessageQueueImpl> candidates = null == messageGroup ? takeMessageQueues(result) : @@ -541,21 +541,25 @@ class ProducerImpl extends ClientImpl implements Producer { }, clientCallbackExecutor); } + private PublishingLoadBalancer updatePublishingLoadBalancer(String topic, TopicRouteData topicRouteData) { + PublishingLoadBalancer publishingLoadBalancer = publishingRouteDataCache.get(topic); + publishingLoadBalancer = null == publishingLoadBalancer ? new PublishingLoadBalancer(topicRouteData) : + publishingLoadBalancer.update(topicRouteData); + publishingRouteDataCache.put(topic, publishingLoadBalancer); + return publishingLoadBalancer; + } + @Override public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) { - final PublishingLoadBalancer publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData); - publishingRouteDataCache.put(topic, publishingLoadBalancer); + updatePublishingLoadBalancer(topic, topicRouteData); } - private ListenableFuture<PublishingLoadBalancer> getPublishingTopicRouteResult(final String topic) { - final PublishingLoadBalancer result = publishingRouteDataCache.get(topic); - if (null != result) { - return Futures.immediateFuture(result); - } - return Futures.transformAsync(getRouteData(topic), topicRouteDataResult -> { - final PublishingLoadBalancer loadBalancer = new PublishingLoadBalancer(topicRouteDataResult); - publishingRouteDataCache.put(topic, loadBalancer); + private ListenableFuture<PublishingLoadBalancer> getPublishingLoadBalancer(final String topic) { + final PublishingLoadBalancer loadBalancer = publishingRouteDataCache.get(topic); + if (null != loadBalancer) { return Futures.immediateFuture(loadBalancer); - }, MoreExecutors.directExecutor()); + } + return Futures.transform(getRouteData(topic), topicRouteData -> updatePublishingLoadBalancer(topic, + topicRouteData), MoreExecutors.directExecutor()); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java index feb9616..1ba9e52 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java @@ -50,7 +50,11 @@ public class PublishingLoadBalancer { private final ImmutableList<MessageQueueImpl> messageQueues; public PublishingLoadBalancer(TopicRouteData topicRouteData) { - this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)); + this(new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)), topicRouteData); + } + + private PublishingLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData) { + this.index = index; final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream() .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isWritable() && Utilities.MASTER_BROKER_ID == mq.getBroker().getId()) @@ -61,6 +65,10 @@ public class PublishingLoadBalancer { this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build(); } + PublishingLoadBalancer update(TopicRouteData topicRouteData) { + return new PublishingLoadBalancer(index, topicRouteData); + } + public MessageQueueImpl takeMessageQueueByMessageGroup(String messageGroup) { final long hashCode = Hashing.sipHash24().hashBytes(messageGroup.getBytes(StandardCharsets.UTF_8)).asLong(); final int index = LongMath.mod(hashCode, messageQueues.size());
