joeCarf commented on code in PR #7513: URL: https://github.com/apache/rocketmq/pull/7513#discussion_r1391967244
########## proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java: ########## @@ -63,17 +65,20 @@ public ClusterMessageService(TopicRouteService topicRouteService, MQClientAPIFac public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, List<Message> msgList, SendMessageRequestHeader requestHeader, long timeoutMillis) { CompletableFuture<List<SendResult>> future; + Message message; if (msgList.size() == 1) { - future = this.mqClientAPIFactory.getClient().sendMessageAsync( - messageQueue.getBrokerAddr(), - messageQueue.getBrokerName(), msgList.get(0), requestHeader, timeoutMillis) - .thenApply(Lists::newArrayList); + message = msgList.get(0); } else { - future = this.mqClientAPIFactory.getClient().sendMessageAsync( - messageQueue.getBrokerAddr(), - messageQueue.getBrokerName(), msgList, requestHeader, timeoutMillis) - .thenApply(Lists::newArrayList); + requestHeader.setBatch(true); + message = MessageBatch.generateFromList(msgList); + MessageClientIDSetter.setUniqID(message); + ((MessageBatch) message).fillBody(); } + future = this.mqClientAPIFactory.getClient().sendMessageAsync( Review Comment: 这个部分的改动似乎没有必要,原有的写法我觉得没什么问题 ########## proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java: ########## @@ -90,22 +90,17 @@ public LocalMessageService(BrokerController brokerController, ChannelManager cha @Override public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, List<Message> msgList, SendMessageRequestHeader requestHeader, long timeoutMillis) { - byte[] body; - String messageId; - if (msgList.size() > 1) { - requestHeader.setBatch(true); - MessageBatch msgBatch = MessageBatch.generateFromList(msgList); - MessageClientIDSetter.setUniqID(msgBatch); - body = msgBatch.encode(); - msgBatch.setBody(body); - messageId = MessageClientIDSetter.getUniqID(msgBatch); + Message message; Review Comment: 判断条件变了一下,带来什么优化了吗 ########## client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java: ########## @@ -152,7 +150,9 @@ public CompletableFuture<SendResult> sendMessageAsync( long timeoutMillis ) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); + + int code = requestHeader.isBatch() ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2; Review Comment: code使用RequestCode类型,可读性会更好 ########## proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java: ########## @@ -63,17 +65,20 @@ public ClusterMessageService(TopicRouteService topicRouteService, MQClientAPIFac public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, List<Message> msgList, SendMessageRequestHeader requestHeader, long timeoutMillis) { CompletableFuture<List<SendResult>> future; + Message message; if (msgList.size() == 1) { - future = this.mqClientAPIFactory.getClient().sendMessageAsync( - messageQueue.getBrokerAddr(), - messageQueue.getBrokerName(), msgList.get(0), requestHeader, timeoutMillis) - .thenApply(Lists::newArrayList); + message = msgList.get(0); } else { - future = this.mqClientAPIFactory.getClient().sendMessageAsync( - messageQueue.getBrokerAddr(), - messageQueue.getBrokerName(), msgList, requestHeader, timeoutMillis) - .thenApply(Lists::newArrayList); + requestHeader.setBatch(true); + message = MessageBatch.generateFromList(msgList); + MessageClientIDSetter.setUniqID(message); + ((MessageBatch) message).fillBody(); } + future = this.mqClientAPIFactory.getClient().sendMessageAsync( Review Comment: 这个部分的改动似乎没有必要,原有的写法我觉得没什么问题 ########## client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java: ########## @@ -152,7 +150,9 @@ public CompletableFuture<SendResult> sendMessageAsync( long timeoutMillis ) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2); + + int code = requestHeader.isBatch() ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2; Review Comment: code使用RequestCode类型,可读性会更好 ########## proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java: ########## @@ -90,22 +90,17 @@ public LocalMessageService(BrokerController brokerController, ChannelManager cha @Override public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, List<Message> msgList, SendMessageRequestHeader requestHeader, long timeoutMillis) { - byte[] body; - String messageId; - if (msgList.size() > 1) { - requestHeader.setBatch(true); - MessageBatch msgBatch = MessageBatch.generateFromList(msgList); - MessageClientIDSetter.setUniqID(msgBatch); - body = msgBatch.encode(); - msgBatch.setBody(body); - messageId = MessageClientIDSetter.getUniqID(msgBatch); + Message message; Review Comment: 判断条件变了一下,带来什么优化了吗 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org