This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 5262358140 [ISSUE #7699] Refector NamespaceRpcHook (#7769) 5262358140 is described below commit 5262358140bcf7b283754a71dd16c2a5c6dbf821 Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com> AuthorDate: Tue Jan 23 13:56:26 2024 +0800 [ISSUE #7699] Refector NamespaceRpcHook (#7769) * [ISSUE #7699] Refector NamespaceRpcHook * fix --- .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 6 +++-- .../rocketmq/client/impl/MQClientAPIImpl.java | 6 +++-- .../client/impl/mqclient/MQClientAPIExt.java | 6 +++-- .../rocketmq/client/rpchook/NamespaceRpcHook.java | 13 ++++------ .../client/rpchook/NamespaceRpcHookTest.java | 8 +++---- .../java/org/apache/rocketmq/common/MixAll.java | 2 ++ .../protocol/header/LockBatchMqRequestHeader.java | 28 ++++++++++++++++++++++ .../header/UnlockBatchMqRequestHeader.java | 28 ++++++++++++++++++++++ 8 files changed, 78 insertions(+), 19 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 3827beb5b6..01745a2b79 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -102,11 +102,13 @@ import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; @@ -906,7 +908,7 @@ public class BrokerOuterAPI { final LockBatchRequestBody requestBody, final long timeoutMillis, final LockCallback callback) throws RemotingException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new LockBatchMqRequestHeader()); request.setBody(requestBody.encode()); this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @@ -945,7 +947,7 @@ public class BrokerOuterAPI { final UnlockBatchRequestBody requestBody, final long timeoutMillis, final UnlockCallback callback) throws RemotingException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new UnlockBatchMqRequestHeader()); request.setBody(requestBody.encode()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index f46dbe3124..1b4b3878c6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -173,6 +173,7 @@ import org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetTopicsByClusterRequestHeader; import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; @@ -195,6 +196,7 @@ import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader; @@ -1613,7 +1615,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { final String addr, final LockBatchRequestBody requestBody, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new LockBatchMqRequestHeader()); request.setBody(requestBody.encode()); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), @@ -1637,7 +1639,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { final long timeoutMillis, final boolean oneway ) throws RemotingException, MQBrokerException, InterruptedException { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new UnlockBatchMqRequestHeader()); request.setBody(requestBody.encode()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java index 3d8625937c..b97e00c577 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java @@ -67,6 +67,7 @@ import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader; import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader; import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader; import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; @@ -77,6 +78,7 @@ import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2; +import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData; @@ -543,7 +545,7 @@ public class MQClientAPIExt extends MQClientAPIImpl { public CompletableFuture<Set<MessageQueue>> lockBatchMQWithFuture(String brokerAddr, LockBatchRequestBody requestBody, long timeoutMillis) { - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new LockBatchMqRequestHeader()); request.setBody(requestBody.encode()); return this.getRemotingClient().invoke(brokerAddr, request, timeoutMillis).thenCompose(response -> { CompletableFuture<Set<MessageQueue>> future0 = new CompletableFuture<>(); @@ -565,7 +567,7 @@ public class MQClientAPIExt extends MQClientAPIImpl { public CompletableFuture<Void> unlockBatchMQOneway(String brokerAddr, UnlockBatchRequestBody requestBody, long timeoutMillis) { CompletableFuture<Void> future = new CompletableFuture<>(); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new UnlockBatchMqRequestHeader()); request.setBody(requestBody.encode()); try { this.getRemotingClient().invokeOneway(brokerAddr, request, timeoutMillis); diff --git a/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java b/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java index 7deee0a9f3..0178b2ca91 100644 --- a/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java +++ b/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java @@ -19,10 +19,9 @@ package org.apache.rocketmq.client.rpchook; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.ClientConfig; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; public class NamespaceRpcHook implements RPCHook { private final ClientConfig clientConfig; @@ -33,13 +32,9 @@ public class NamespaceRpcHook implements RPCHook { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { - CommandCustomHeader customHeader = request.readCustomHeader(); - if (customHeader instanceof RpcRequestHeader) { - RpcRequestHeader requestHeader = (RpcRequestHeader) customHeader; - if (StringUtils.isNotEmpty(clientConfig.getNamespaceV2())) { - requestHeader.setNamespaced(true); - requestHeader.setNamespace(clientConfig.getNamespaceV2()); - } + if (StringUtils.isNotEmpty(clientConfig.getNamespaceV2())) { + request.addExtField(MixAll.RPC_REQUEST_HEADER_NAMESPACED_FIELD, "true"); + request.addExtField(MixAll.RPC_REQUEST_HEADER_NAMESPACE_FIELD, clientConfig.getNamespaceV2()); } } diff --git a/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java b/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java index 1551ce0935..385c2ceec0 100644 --- a/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.client.rpchook; import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader; @@ -39,8 +40,8 @@ public class NamespaceRpcHookTest { PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); namespaceRpcHook.doBeforeRequest("", request); - assertThat(pullMessageRequestHeader.getNamespaced()).isTrue(); - assertThat(pullMessageRequestHeader.getNamespace()).isEqualTo(namespace); + assertThat(request.getExtFields().get(MixAll.RPC_REQUEST_HEADER_NAMESPACED_FIELD)).isEqualTo("true"); + assertThat(request.getExtFields().get(MixAll.RPC_REQUEST_HEADER_NAMESPACE_FIELD)).isEqualTo(namespace); } @Test @@ -50,7 +51,6 @@ public class NamespaceRpcHookTest { PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader); namespaceRpcHook.doBeforeRequest("", request); - assertThat(pullMessageRequestHeader.getNamespaced()).isNull(); - assertThat(pullMessageRequestHeader.getNamespace()).isNull(); + assertThat(request.getExtFields()).isNull(); } } \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index c11eb377b9..cdcc54cd92 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -108,6 +108,8 @@ public class MixAll { public static final String ROCKETMQ_ZONE_MODE_PROPERTY = "rocketmq.zone.mode"; public static final String ZONE_NAME = "__ZONE_NAME"; public static final String ZONE_MODE = "__ZONE_MODE"; + public final static String RPC_REQUEST_HEADER_NAMESPACED_FIELD = "nsd"; + public final static String RPC_REQUEST_HEADER_NAMESPACE_FIELD = "ns"; private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); public static final String LOGICAL_QUEUE_MOCK_BROKER_PREFIX = "__syslo__"; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/LockBatchMqRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/LockBatchMqRequestHeader.java new file mode 100644 index 0000000000..3484fa7d3e --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/LockBatchMqRequestHeader.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; + +public class LockBatchMqRequestHeader extends RpcRequestHeader { + @Override + public void checkFields() throws RemotingCommandException { + + } +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnlockBatchMqRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnlockBatchMqRequestHeader.java new file mode 100644 index 0000000000..e7a44f2f8b --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnlockBatchMqRequestHeader.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; + +public class UnlockBatchMqRequestHeader extends RpcRequestHeader { + @Override + public void checkFields() throws RemotingCommandException { + + } +}