This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 3968c186a5 [ISSUE #7231] Fix: proxy client language error (#7200) 3968c186a5 is described below commit 3968c186a59db96701ade8c343bc6a5d31ee2d24 Author: weihubeats <we...@apache.org> AuthorDate: Fri Oct 20 14:49:00 2023 +0800 [ISSUE #7231] Fix: proxy client language error (#7200) * Adding null does not update * add langeuga code * add langeuga code * add langeuga code * add langeuga code * add langeuga code * Rerun ci * Rerun ci * Rerun ci * remove redundant package imports * redundant line * modify the parameter passed as proxyContext to language * format --- .../rocketmq/proxy/service/message/LocalMessageService.java | 12 ++++++------ .../rocketmq/proxy/service/message/LocalRemotingCommand.java | 8 ++++++-- .../org/apache/rocketmq/remoting/protocol/LanguageCode.java | 11 +++++++++++ 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index ca7dcc9eb0..aaa688fee6 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -104,7 +104,7 @@ public class LocalMessageService implements MessageService { body = message.getBody(); messageId = MessageClientIDSetter.getUniqID(message); } - RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); + RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader, ctx.getLanguage()); request.setBody(body); CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); SimpleChannel channel = channelManager.createInvocationChannel(ctx); @@ -162,7 +162,7 @@ public class LocalMessageService implements MessageService { ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) { SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader, ctx.getLanguage()); CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); try { RemotingCommand response = brokerController.getSendMessageProcessor() @@ -181,7 +181,7 @@ public class LocalMessageService implements MessageService { CompletableFuture<Void> future = new CompletableFuture<>(); SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader, ctx.getLanguage()); try { brokerController.getEndTransactionProcessor() .processRequest(channelHandlerContext, command); @@ -196,7 +196,7 @@ public class LocalMessageService implements MessageService { public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMessageQueue messageQueue, PopMessageRequestHeader requestHeader, long timeoutMillis) { requestHeader.setBornTime(System.currentTimeMillis()); - RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader); + RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader, ctx.getLanguage()); CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); SimpleChannel channel = channelManager.createInvocationChannel(ctx); InvocationContext invocationContext = new InvocationContext(future); @@ -307,7 +307,7 @@ public class LocalMessageService implements MessageService { ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) { SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader, ctx.getLanguage()); CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); try { RemotingCommand response = brokerController.getChangeInvisibleTimeProcessor() @@ -346,7 +346,7 @@ public class LocalMessageService implements MessageService { AckMessageRequestHeader requestHeader, long timeoutMillis) { SimpleChannel channel = channelManager.createChannel(ctx); ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext(); - RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader); + RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader, ctx.getLanguage()); CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); try { RemotingCommand response = brokerController.getAckMessageProcessor() diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java index 73048dbbc2..915cafcd57 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java @@ -16,16 +16,19 @@ */ package org.apache.rocketmq.proxy.service.message; -import java.util.HashMap; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.HashMap; + public class LocalRemotingCommand extends RemotingCommand { - public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) { + public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader, String language) { LocalRemotingCommand cmd = new LocalRemotingCommand(); cmd.setCode(code); + cmd.setLanguage(LanguageCode.getCode(language)); cmd.writeCustomHeader(customHeader); cmd.setExtFields(new HashMap<>()); setCmdVersion(cmd); @@ -37,4 +40,5 @@ public class LocalRemotingCommand extends RemotingCommand { Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException { return classHeader.cast(readCustomHeader()); } + } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java index 19280f9967..2df9fbf027 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java @@ -17,6 +17,11 @@ package org.apache.rocketmq.remoting.protocol; +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + public enum LanguageCode { JAVA((byte) 0), CPP((byte) 1), @@ -50,4 +55,10 @@ public enum LanguageCode { public byte getCode() { return code; } + + private static final Map<String, LanguageCode> MAP = Arrays.stream(LanguageCode.values()).collect(Collectors.toMap(LanguageCode::name, Function.identity())); + + public static LanguageCode getCode(String language) { + return MAP.get(language); + } }