This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3968c186a5 [ISSUE #7231] Fix: proxy client language error (#7200)
3968c186a5 is described below

commit 3968c186a59db96701ade8c343bc6a5d31ee2d24
Author: weihubeats <we...@apache.org>
AuthorDate: Fri Oct 20 14:49:00 2023 +0800

    [ISSUE #7231] Fix: proxy client language error (#7200)
    
    * Adding null does not update
    
    * add langeuga code
    
    * add langeuga code
    
    * add langeuga code
    
    * add langeuga code
    
    * add langeuga code
    
    * Rerun ci
    
    * Rerun ci
    
    * Rerun ci
    
    * remove redundant package imports
    
    * redundant line
    
    * modify the parameter passed as proxyContext to language
    
    * format
---
 .../rocketmq/proxy/service/message/LocalMessageService.java  | 12 ++++++------
 .../rocketmq/proxy/service/message/LocalRemotingCommand.java |  8 ++++++--
 .../org/apache/rocketmq/remoting/protocol/LanguageCode.java  | 11 +++++++++++
 3 files changed, 23 insertions(+), 8 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index ca7dcc9eb0..aaa688fee6 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -104,7 +104,7 @@ public class LocalMessageService implements MessageService {
             body = message.getBody();
             messageId = MessageClientIDSetter.getUniqID(message);
         }
-        RemotingCommand request = 
LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, 
requestHeader);
+        RemotingCommand request = 
LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, 
requestHeader, ctx.getLanguage());
         request.setBody(body);
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         SimpleChannel channel = channelManager.createInvocationChannel(ctx);
@@ -162,7 +162,7 @@ public class LocalMessageService implements MessageService {
         ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) {
         SimpleChannel channel = channelManager.createChannel(ctx);
         ChannelHandlerContext channelHandlerContext = 
channel.getChannelHandlerContext();
-        RemotingCommand command = 
LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, 
requestHeader);
+        RemotingCommand command = 
LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, 
requestHeader, ctx.getLanguage());
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
             RemotingCommand response = 
brokerController.getSendMessageProcessor()
@@ -181,7 +181,7 @@ public class LocalMessageService implements MessageService {
         CompletableFuture<Void> future = new CompletableFuture<>();
         SimpleChannel channel = channelManager.createChannel(ctx);
         ChannelHandlerContext channelHandlerContext = 
channel.getChannelHandlerContext();
-        RemotingCommand command = 
LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, 
requestHeader);
+        RemotingCommand command = 
LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, 
requestHeader, ctx.getLanguage());
         try {
             brokerController.getEndTransactionProcessor()
                 .processRequest(channelHandlerContext, command);
@@ -196,7 +196,7 @@ public class LocalMessageService implements MessageService {
     public CompletableFuture<PopResult> popMessage(ProxyContext ctx, 
AddressableMessageQueue messageQueue,
         PopMessageRequestHeader requestHeader, long timeoutMillis) {
         requestHeader.setBornTime(System.currentTimeMillis());
-        RemotingCommand request = 
LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, 
requestHeader);
+        RemotingCommand request = 
LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, 
requestHeader, ctx.getLanguage());
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         SimpleChannel channel = channelManager.createInvocationChannel(ctx);
         InvocationContext invocationContext = new InvocationContext(future);
@@ -307,7 +307,7 @@ public class LocalMessageService implements MessageService {
         ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) {
         SimpleChannel channel = channelManager.createChannel(ctx);
         ChannelHandlerContext channelHandlerContext = 
channel.getChannelHandlerContext();
-        RemotingCommand command = 
LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
 requestHeader);
+        RemotingCommand command = 
LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
 requestHeader, ctx.getLanguage());
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
             RemotingCommand response = 
brokerController.getChangeInvisibleTimeProcessor()
@@ -346,7 +346,7 @@ public class LocalMessageService implements MessageService {
         AckMessageRequestHeader requestHeader, long timeoutMillis) {
         SimpleChannel channel = channelManager.createChannel(ctx);
         ChannelHandlerContext channelHandlerContext = 
channel.getChannelHandlerContext();
-        RemotingCommand command = 
LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, 
requestHeader);
+        RemotingCommand command = 
LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, 
requestHeader, ctx.getLanguage());
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
             RemotingCommand response = 
brokerController.getAckMessageProcessor()
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
index 73048dbbc2..915cafcd57 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
@@ -16,16 +16,19 @@
  */
 package org.apache.rocketmq.proxy.service.message;
 
-import java.util.HashMap;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import java.util.HashMap;
+
 public class LocalRemotingCommand extends RemotingCommand {
 
-    public static LocalRemotingCommand createRequestCommand(int code, 
CommandCustomHeader customHeader) {
+    public static LocalRemotingCommand createRequestCommand(int code, 
CommandCustomHeader customHeader, String language) {
         LocalRemotingCommand cmd = new LocalRemotingCommand();
         cmd.setCode(code);
+        cmd.setLanguage(LanguageCode.getCode(language));
         cmd.writeCustomHeader(customHeader);
         cmd.setExtFields(new HashMap<>());
         setCmdVersion(cmd);
@@ -37,4 +40,5 @@ public class LocalRemotingCommand extends RemotingCommand {
         Class<? extends CommandCustomHeader> classHeader) throws 
RemotingCommandException {
         return classHeader.cast(readCustomHeader());
     }
+    
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
index 19280f9967..2df9fbf027 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
@@ -17,6 +17,11 @@
 
 package org.apache.rocketmq.remoting.protocol;
 
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 public enum LanguageCode {
     JAVA((byte) 0),
     CPP((byte) 1),
@@ -50,4 +55,10 @@ public enum LanguageCode {
     public byte getCode() {
         return code;
     }
+    
+    private static final Map<String, LanguageCode> MAP = 
Arrays.stream(LanguageCode.values()).collect(Collectors.toMap(LanguageCode::name,
 Function.identity()));
+
+    public static LanguageCode getCode(String language) {
+        return MAP.get(language);
+    }
 }

Reply via email to