drpmma commented on code in PR #6568:
URL: https://github.com/apache/rocketmq/pull/6568#discussion_r1307024124


##########
proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java:
##########
@@ -102,11 +108,19 @@ public CompletableFuture<List<SendResult>> 
sendMessage(ProxyContext ctx, QueueSe
                         if 
(SendStatus.SEND_OK.equals(sendResult.getSendStatus()) &&
                             tranType == 
MessageSysFlag.TRANSACTION_PREPARED_TYPE &&
                             
StringUtils.isNotBlank(sendResult.getTransactionId())) {
-                            fillTransactionData(ctx, producerGroup, 
messageQueue, sendResult, messageList);
+                            fillTransactionData(ctx, producerGroup, 
finalMessageQueue, sendResult, messageList);
                         }
                     }
                     return sendResultList;
-                }, this.executor);
+                }, this.executor)
+                    .whenComplete((result, exception) -> {
+                        endTimestamp.set(System.currentTimeMillis());

Review Comment:
   There is no need to use AtomicLong for `endTimestamp`, just a local variable 
would be fine



##########
proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java:
##########
@@ -44,8 +47,9 @@ public class MessageQueueSelector {
     private final Map<String, AddressableMessageQueue> brokerNameQueueMap = 
new ConcurrentHashMap<>();
     private final AtomicInteger queueIndex;
     private final AtomicInteger brokerIndex;
+    private TopicRouteService topicRouteService;
 
-    public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean 
read) {
+    public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, 
TopicRouteService topicRouteService, boolean read) {

Review Comment:
   Why not pass MqFaultStrategy instead of TopicRouteService as 
TopicRouteService is not actually used in this class.



##########
client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java:
##########
@@ -65,68 +140,99 @@ public void remove(final String name) {
     @Override
     public String pickOneAtLeast() {
         final Enumeration<FaultItem> elements = this.faultItemTable.elements();
-        List<FaultItem> tmpList = new LinkedList<>();
+        List<FaultItem> tmpList = new LinkedList<FaultItem>();
         while (elements.hasMoreElements()) {
             final FaultItem faultItem = elements.nextElement();
             tmpList.add(faultItem);
         }
+
         if (!tmpList.isEmpty()) {
-            Collections.sort(tmpList);
-            final int half = tmpList.size() / 2;
-            if (half <= 0) {
-                return tmpList.get(0).getName();
-            } else {
-                final int i = this.randomItem.incrementAndGet() % half;
-                return tmpList.get(i).getName();
+            Collections.shuffle(tmpList);
+            //Collections.sort(tmpList);

Review Comment:
   annotation could be removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to