drpmma commented on code in PR #6568: URL: https://github.com/apache/rocketmq/pull/6568#discussion_r1307024124
########## proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java: ########## @@ -102,11 +108,19 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSe if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) && tranType == MessageSysFlag.TRANSACTION_PREPARED_TYPE && StringUtils.isNotBlank(sendResult.getTransactionId())) { - fillTransactionData(ctx, producerGroup, messageQueue, sendResult, messageList); + fillTransactionData(ctx, producerGroup, finalMessageQueue, sendResult, messageList); } } return sendResultList; - }, this.executor); + }, this.executor) + .whenComplete((result, exception) -> { + endTimestamp.set(System.currentTimeMillis()); Review Comment: There is no need to use AtomicLong for `endTimestamp`, just a local variable would be fine ########## proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java: ########## @@ -44,8 +47,9 @@ public class MessageQueueSelector { private final Map<String, AddressableMessageQueue> brokerNameQueueMap = new ConcurrentHashMap<>(); private final AtomicInteger queueIndex; private final AtomicInteger brokerIndex; + private TopicRouteService topicRouteService; - public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read) { + public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, TopicRouteService topicRouteService, boolean read) { Review Comment: Why not pass MqFaultStrategy instead of TopicRouteService as TopicRouteService is not actually used in this class. ########## client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java: ########## @@ -65,68 +140,99 @@ public void remove(final String name) { @Override public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); - List<FaultItem> tmpList = new LinkedList<>(); + List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } + if (!tmpList.isEmpty()) { - Collections.sort(tmpList); - final int half = tmpList.size() / 2; - if (half <= 0) { - return tmpList.get(0).getName(); - } else { - final int i = this.randomItem.incrementAndGet() % half; - return tmpList.get(i).getName(); + Collections.shuffle(tmpList); + //Collections.sort(tmpList); Review Comment: annotation could be removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org