This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5262358140 [ISSUE #7699] Refector NamespaceRpcHook (#7769)
5262358140 is described below

commit 5262358140bcf7b283754a71dd16c2a5c6dbf821
Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com>
AuthorDate: Tue Jan 23 13:56:26 2024 +0800

    [ISSUE #7699] Refector NamespaceRpcHook (#7769)
    
    * [ISSUE #7699] Refector NamespaceRpcHook
    
    * fix
---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  6 +++--
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  6 +++--
 .../client/impl/mqclient/MQClientAPIExt.java       |  6 +++--
 .../rocketmq/client/rpchook/NamespaceRpcHook.java  | 13 ++++------
 .../client/rpchook/NamespaceRpcHookTest.java       |  8 +++----
 .../java/org/apache/rocketmq/common/MixAll.java    |  2 ++
 .../protocol/header/LockBatchMqRequestHeader.java  | 28 ++++++++++++++++++++++
 .../header/UnlockBatchMqRequestHeader.java         | 28 ++++++++++++++++++++++
 8 files changed, 78 insertions(+), 19 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 3827beb5b6..01745a2b79 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -102,11 +102,13 @@ import 
org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
@@ -906,7 +908,7 @@ public class BrokerOuterAPI {
         final LockBatchRequestBody requestBody,
         final long timeoutMillis,
         final LockCallback callback) throws RemotingException, 
InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new 
LockBatchMqRequestHeader());
 
         request.setBody(requestBody.encode());
         this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
InvokeCallback() {
@@ -945,7 +947,7 @@ public class BrokerOuterAPI {
         final UnlockBatchRequestBody requestBody,
         final long timeoutMillis,
         final UnlockCallback callback) throws RemotingException, 
InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new 
UnlockBatchMqRequestHeader());
 
         request.setBody(requestBody.encode());
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index f46dbe3124..1b4b3878c6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -173,6 +173,7 @@ import 
org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetTopicsByClusterRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
@@ -195,6 +196,7 @@ import 
org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
@@ -1613,7 +1615,7 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         final String addr,
         final LockBatchRequestBody requestBody,
         final long timeoutMillis) throws RemotingException, MQBrokerException, 
InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new 
LockBatchMqRequestHeader());
 
         request.setBody(requestBody.encode());
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
@@ -1637,7 +1639,7 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         final long timeoutMillis,
         final boolean oneway
     ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new 
UnlockBatchMqRequestHeader());
 
         request.setBody(requestBody.encode());
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index 3d8625937c..b97e00c577 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -67,6 +67,7 @@ import 
org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
@@ -77,6 +78,7 @@ import 
org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
 
@@ -543,7 +545,7 @@ public class MQClientAPIExt extends MQClientAPIImpl {
 
     public CompletableFuture<Set<MessageQueue>> lockBatchMQWithFuture(String 
brokerAddr,
         LockBatchRequestBody requestBody, long timeoutMillis) {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new 
LockBatchMqRequestHeader());
         request.setBody(requestBody.encode());
         return this.getRemotingClient().invoke(brokerAddr, request, 
timeoutMillis).thenCompose(response -> {
             CompletableFuture<Set<MessageQueue>> future0 = new 
CompletableFuture<>();
@@ -565,7 +567,7 @@ public class MQClientAPIExt extends MQClientAPIImpl {
     public CompletableFuture<Void> unlockBatchMQOneway(String brokerAddr,
         UnlockBatchRequestBody requestBody, long timeoutMillis) {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new 
UnlockBatchMqRequestHeader());
         request.setBody(requestBody.encode());
         try {
             this.getRemotingClient().invokeOneway(brokerAddr, request, 
timeoutMillis);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java 
b/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java
index 7deee0a9f3..0178b2ca91 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java
@@ -19,10 +19,9 @@ package org.apache.rocketmq.client.rpchook;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;
 
 public class NamespaceRpcHook implements RPCHook {
     private final ClientConfig clientConfig;
@@ -33,13 +32,9 @@ public class NamespaceRpcHook implements RPCHook {
 
     @Override
     public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
-        CommandCustomHeader customHeader = request.readCustomHeader();
-        if (customHeader instanceof RpcRequestHeader) {
-            RpcRequestHeader requestHeader = (RpcRequestHeader) customHeader;
-            if (StringUtils.isNotEmpty(clientConfig.getNamespaceV2())) {
-                requestHeader.setNamespaced(true);
-                requestHeader.setNamespace(clientConfig.getNamespaceV2());
-            }
+        if (StringUtils.isNotEmpty(clientConfig.getNamespaceV2())) {
+            request.addExtField(MixAll.RPC_REQUEST_HEADER_NAMESPACED_FIELD, 
"true");
+            request.addExtField(MixAll.RPC_REQUEST_HEADER_NAMESPACE_FIELD, 
clientConfig.getNamespaceV2());
         }
     }
 
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java
index 1551ce0935..385c2ceec0 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.client.rpchook;
 
 import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
@@ -39,8 +40,8 @@ public class NamespaceRpcHookTest {
         PullMessageRequestHeader pullMessageRequestHeader = new 
PullMessageRequestHeader();
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, 
pullMessageRequestHeader);
         namespaceRpcHook.doBeforeRequest("", request);
-        assertThat(pullMessageRequestHeader.getNamespaced()).isTrue();
-        
assertThat(pullMessageRequestHeader.getNamespace()).isEqualTo(namespace);
+        
assertThat(request.getExtFields().get(MixAll.RPC_REQUEST_HEADER_NAMESPACED_FIELD)).isEqualTo("true");
+        
assertThat(request.getExtFields().get(MixAll.RPC_REQUEST_HEADER_NAMESPACE_FIELD)).isEqualTo(namespace);
     }
 
     @Test
@@ -50,7 +51,6 @@ public class NamespaceRpcHookTest {
         PullMessageRequestHeader pullMessageRequestHeader = new 
PullMessageRequestHeader();
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, 
pullMessageRequestHeader);
         namespaceRpcHook.doBeforeRequest("", request);
-        assertThat(pullMessageRequestHeader.getNamespaced()).isNull();
-        assertThat(pullMessageRequestHeader.getNamespace()).isNull();
+        assertThat(request.getExtFields()).isNull();
     }
 }
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java 
b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index c11eb377b9..cdcc54cd92 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -108,6 +108,8 @@ public class MixAll {
     public static final String ROCKETMQ_ZONE_MODE_PROPERTY = 
"rocketmq.zone.mode";
     public static final String ZONE_NAME = "__ZONE_NAME";
     public static final String ZONE_MODE = "__ZONE_MODE";
+    public final static String RPC_REQUEST_HEADER_NAMESPACED_FIELD = "nsd";
+    public final static String RPC_REQUEST_HEADER_NAMESPACE_FIELD = "ns";
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
     public static final String LOGICAL_QUEUE_MOCK_BROKER_PREFIX = "__syslo__";
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/LockBatchMqRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/LockBatchMqRequestHeader.java
new file mode 100644
index 0000000000..3484fa7d3e
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/LockBatchMqRequestHeader.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.header;
+
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;
+
+public class LockBatchMqRequestHeader extends RpcRequestHeader {
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnlockBatchMqRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnlockBatchMqRequestHeader.java
new file mode 100644
index 0000000000..e7a44f2f8b
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnlockBatchMqRequestHeader.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.header;
+
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;
+
+public class UnlockBatchMqRequestHeader extends RpcRequestHeader {
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+}

Reply via email to