joeCarf commented on code in PR #7513:
URL: https://github.com/apache/rocketmq/pull/7513#discussion_r1391967244


##########
proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java:
##########
@@ -63,17 +65,20 @@ public ClusterMessageService(TopicRouteService 
topicRouteService, MQClientAPIFac
     public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, 
AddressableMessageQueue messageQueue,
         List<Message> msgList, SendMessageRequestHeader requestHeader, long 
timeoutMillis) {
         CompletableFuture<List<SendResult>> future;
+        Message message;
         if (msgList.size() == 1) {
-            future = this.mqClientAPIFactory.getClient().sendMessageAsync(
-                    messageQueue.getBrokerAddr(),
-                    messageQueue.getBrokerName(), msgList.get(0), 
requestHeader, timeoutMillis)
-                .thenApply(Lists::newArrayList);
+            message = msgList.get(0);
         } else {
-            future = this.mqClientAPIFactory.getClient().sendMessageAsync(
-                    messageQueue.getBrokerAddr(),
-                    messageQueue.getBrokerName(), msgList, requestHeader, 
timeoutMillis)
-                .thenApply(Lists::newArrayList);
+            requestHeader.setBatch(true);
+            message = MessageBatch.generateFromList(msgList);
+            MessageClientIDSetter.setUniqID(message);
+            ((MessageBatch) message).fillBody();
         }
+        future = this.mqClientAPIFactory.getClient().sendMessageAsync(

Review Comment:
   这个部分的改动似乎没有必要,原有的写法我觉得没什么问题



##########
proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java:
##########
@@ -90,22 +90,17 @@ public LocalMessageService(BrokerController 
brokerController, ChannelManager cha
     @Override
     public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, 
AddressableMessageQueue messageQueue,
         List<Message> msgList, SendMessageRequestHeader requestHeader, long 
timeoutMillis) {
-        byte[] body;
-        String messageId;
-        if (msgList.size() > 1) {
-            requestHeader.setBatch(true);
-            MessageBatch msgBatch = MessageBatch.generateFromList(msgList);
-            MessageClientIDSetter.setUniqID(msgBatch);
-            body = msgBatch.encode();
-            msgBatch.setBody(body);
-            messageId = MessageClientIDSetter.getUniqID(msgBatch);
+        Message message;

Review Comment:
   判断条件变了一下,带来什么优化了吗



##########
client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java:
##########
@@ -152,7 +150,9 @@ public CompletableFuture<SendResult> sendMessageAsync(
         long timeoutMillis
     ) {
         SendMessageRequestHeaderV2 requestHeaderV2 = 
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, 
requestHeaderV2);
+
+        int code = requestHeader.isBatch() ? RequestCode.SEND_BATCH_MESSAGE : 
RequestCode.SEND_MESSAGE_V2;

Review Comment:
   code使用RequestCode类型,可读性会更好



##########
proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java:
##########
@@ -63,17 +65,20 @@ public ClusterMessageService(TopicRouteService 
topicRouteService, MQClientAPIFac
     public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, 
AddressableMessageQueue messageQueue,
         List<Message> msgList, SendMessageRequestHeader requestHeader, long 
timeoutMillis) {
         CompletableFuture<List<SendResult>> future;
+        Message message;
         if (msgList.size() == 1) {
-            future = this.mqClientAPIFactory.getClient().sendMessageAsync(
-                    messageQueue.getBrokerAddr(),
-                    messageQueue.getBrokerName(), msgList.get(0), 
requestHeader, timeoutMillis)
-                .thenApply(Lists::newArrayList);
+            message = msgList.get(0);
         } else {
-            future = this.mqClientAPIFactory.getClient().sendMessageAsync(
-                    messageQueue.getBrokerAddr(),
-                    messageQueue.getBrokerName(), msgList, requestHeader, 
timeoutMillis)
-                .thenApply(Lists::newArrayList);
+            requestHeader.setBatch(true);
+            message = MessageBatch.generateFromList(msgList);
+            MessageClientIDSetter.setUniqID(message);
+            ((MessageBatch) message).fillBody();
         }
+        future = this.mqClientAPIFactory.getClient().sendMessageAsync(

Review Comment:
   这个部分的改动似乎没有必要,原有的写法我觉得没什么问题



##########
client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java:
##########
@@ -152,7 +150,9 @@ public CompletableFuture<SendResult> sendMessageAsync(
         long timeoutMillis
     ) {
         SendMessageRequestHeaderV2 requestHeaderV2 = 
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, 
requestHeaderV2);
+
+        int code = requestHeader.isBatch() ? RequestCode.SEND_BATCH_MESSAGE : 
RequestCode.SEND_MESSAGE_V2;

Review Comment:
   code使用RequestCode类型,可读性会更好



##########
proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java:
##########
@@ -90,22 +90,17 @@ public LocalMessageService(BrokerController 
brokerController, ChannelManager cha
     @Override
     public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, 
AddressableMessageQueue messageQueue,
         List<Message> msgList, SendMessageRequestHeader requestHeader, long 
timeoutMillis) {
-        byte[] body;
-        String messageId;
-        if (msgList.size() > 1) {
-            requestHeader.setBatch(true);
-            MessageBatch msgBatch = MessageBatch.generateFromList(msgList);
-            MessageClientIDSetter.setUniqID(msgBatch);
-            body = msgBatch.encode();
-            msgBatch.setBody(body);
-            messageId = MessageClientIDSetter.getUniqID(msgBatch);
+        Message message;

Review Comment:
   判断条件变了一下,带来什么优化了吗



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to