This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e5e38396ba [ISSUE #8591] Preliminary support for key commands of LMQ 
(#8590)
e5e38396ba is described below

commit e5e38396ba32293b3bd40a5a40ff402d42dce928
Author: rongtong <jinrongton...@mails.ucas.ac.cn>
AuthorDate: Fri Aug 30 14:15:03 2024 +0800

    [ISSUE #8591] Preliminary support for key commands of LMQ (#8590)
    
    * Preliminary support for key commands of LMQ
    
    * Preliminary support for key commands of LMQ
    
    * Optimize some code
    
    * Fix some bugs and UTs for lmq support
    
    * Fix UTs can not pass
    
    * Fix UTs can not pass
    
    * Add some check to prevent NPE
---
 .../broker/processor/AdminBrokerProcessor.java     |   2 +-
 .../apache/rocketmq/client/impl/MQAdminImpl.java   |  47 ++++--
 .../rocketmq/client/impl/MQAdminImplTest.java      |   2 +-
 .../example/{simple => lmq}/LMQProducer.java       |   3 +-
 .../example/{simple => lmq}/LMQPullConsumer.java   |   2 +-
 .../example/{simple => lmq}/LMQPushConsumer.java   |   2 +-
 .../{simple => lmq}/LMQPushPopConsumer.java        |   2 +-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  85 ++++++++---
 .../tools/admin/DefaultMQAdminExtImpl.java         | 157 +++++++++++++++------
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  12 ++
 .../consumer/ConsumerProgressSubCommand.java       |   8 +-
 .../command/message/QueryMsgByIdSubCommand.java    |  29 ++--
 .../command/message/QueryMsgByKeySubCommand.java   |  25 +++-
 .../message/QueryMsgByUniqueKeySubCommand.java     |  28 ++--
 .../command/offset/ResetOffsetByTimeCommand.java   |  13 +-
 .../offset/ResetOffsetByTimeOldCommand.java        |  13 +-
 .../command/offset/SkipAccumulationSubCommand.java |   7 +-
 .../tools/command/topic/TopicStatusSubCommand.java |  24 +++-
 .../message/QueryMsgByUniqueKeySubCommandTest.java |  12 +-
 19 files changed, 348 insertions(+), 125 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 3039cf5c97..28bd254914 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -2062,7 +2062,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         Map<Integer, Long> queueOffsetMap = new HashMap<>();
 
         // Reset offset for all queues belonging to the specified topic
-        TopicConfig topicConfig = 
brokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
+        TopicConfig topicConfig = 
brokerController.getTopicConfigManager().selectTopicConfig(topic);
         if (null == topicConfig) {
             response.setCode(ResponseCode.TOPIC_NOT_EXIST);
             response.setRemark("Topic " + topic + " does not exist");
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index bcfe29bd4f..c1e3ee33dc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -43,6 +44,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -199,7 +201,7 @@ public class MQAdminImpl {
         if (brokerAddr != null) {
             try {
                 return 
this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, 
timestamp,
-                        boundaryType, timeoutMillis);
+                    boundaryType, timeoutMillis);
             } catch (Exception e) {
                 throw new MQClientException("Invoke Broker[" + brokerAddr + "] 
exception", e);
             }
@@ -277,13 +279,20 @@ public class MQAdminImpl {
     public QueryResult queryMessage(String topic, String key, int maxNum, long 
begin,
         long end) throws MQClientException,
         InterruptedException {
-        return queryMessage(topic, key, maxNum, begin, end, false);
+        return queryMessage(null, topic, key, maxNum, begin, end, false);
     }
 
     public QueryResult queryMessageByUniqKey(String topic, String uniqKey, int 
maxNum, long begin, long end)
         throws MQClientException, InterruptedException {
 
-        return queryMessage(topic, uniqKey, maxNum, begin, end, true);
+        return queryMessage(null, topic, uniqKey, maxNum, begin, end, true);
+    }
+
+    public QueryResult queryMessageByUniqKey(String clusterName, String topic, 
String uniqKey, int maxNum, long begin,
+        long end)
+        throws MQClientException, InterruptedException {
+
+        return queryMessage(clusterName, topic, uniqKey, maxNum, begin, end, 
true);
     }
 
     public MessageExt queryMessageByUniqKey(String topic,
@@ -311,25 +320,29 @@ public class MQAdminImpl {
         }
     }
 
-    protected QueryResult queryMessage(String topic, String key, int maxNum, 
long begin, long end,
+    public QueryResult queryMessage(String clusterName, String topic, String 
key, int maxNum, long begin, long end,
         boolean isUniqKey) throws MQClientException,
         InterruptedException {
-        return queryMessage(null, topic, key, maxNum, begin, end, isUniqKey);
-    }
+        boolean isLmq = MixAll.isLmq(topic);
+
+        String routeTopic = topic;
+        // if topic is lmq ,then use clusterName as lmq parent topic
+        // Use clusterName or lmq parent topic to get topic route for lmq or 
rmq_sys_wheel_timer
+        if (!StringUtils.isEmpty(topic) && (isLmq || 
topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"))
+            && !StringUtils.isEmpty(clusterName)) {
+            routeTopic = clusterName;
+        }
 
-    protected QueryResult queryMessage(String clusterName, String topic, 
String key, int maxNum, long begin, long end,
-        boolean isUniqKey) throws MQClientException,
-        InterruptedException {
-        TopicRouteData topicRouteData = 
this.mQClientFactory.getAnExistTopicRouteData(topic);
+        TopicRouteData topicRouteData = 
this.mQClientFactory.getAnExistTopicRouteData(routeTopic);
         if (null == topicRouteData) {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
-            topicRouteData = 
this.mQClientFactory.getAnExistTopicRouteData(topic);
+            
this.mQClientFactory.updateTopicRouteInfoFromNameServer(routeTopic);
+            topicRouteData = 
this.mQClientFactory.getAnExistTopicRouteData(routeTopic);
         }
 
         if (topicRouteData != null) {
             List<String> brokerAddrs = new LinkedList<>();
             for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
-                if (clusterName != null && !clusterName.isEmpty()
+                if (!isLmq && clusterName != null && !clusterName.isEmpty()
                     && !clusterName.equals(brokerData.getCluster())) {
                     continue;
                 }
@@ -347,7 +360,11 @@ public class MQAdminImpl {
                 for (String addr : brokerAddrs) {
                     try {
                         QueryMessageRequestHeader requestHeader = new 
QueryMessageRequestHeader();
-                        requestHeader.setTopic(topic);
+                        if (isLmq) {
+                            requestHeader.setTopic(clusterName);
+                        } else {
+                            requestHeader.setTopic(topic);
+                        }
                         requestHeader.setKey(key);
                         requestHeader.setMaxNum(maxNum);
                         requestHeader.setBeginTimestamp(begin);
@@ -436,7 +453,7 @@ public class MQAdminImpl {
                                 String[] keyArray = 
keys.split(MessageConst.KEY_SEPARATOR);
                                 for (String k : keyArray) {
                                     // both topic and key must be equal at the 
same time
-                                    if (Objects.equals(key, k) && 
Objects.equals(topic, msgTopic)) {
+                                    if (Objects.equals(key, k) && (isLmq || 
Objects.equals(topic, msgTopic))) {
                                         matched = true;
                                         break;
                                     }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java
index 3663df24d6..f52aba2dc0 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java
@@ -165,7 +165,7 @@ public class MQAdminImplTest {
             callback.operationSucceed(response);
             return null;
         }).when(mQClientAPIImpl).queryMessage(anyString(), any(), anyLong(), 
any(InvokeCallback.class), any());
-        QueryResult actual = mqAdminImpl.queryMessage(defaultTopic, "keys", 
100, 1L, 50L, false);
+        QueryResult actual = mqAdminImpl.queryMessage(defaultTopic, "keys", 
100, 1L, 50L);
         assertNotNull(actual);
         assertEquals(1, actual.getMessageList().size());
         assertEquals(defaultTopic, actual.getMessageList().get(0).getTopic());
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQProducer.java 
b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQProducer.java
similarity index 97%
rename from 
example/src/main/java/org/apache/rocketmq/example/simple/LMQProducer.java
rename to example/src/main/java/org/apache/rocketmq/example/lmq/LMQProducer.java
index 81ef2e1385..5fee948028 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LMQProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQProducer.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.example.simple;
+package org.apache.rocketmq.example.lmq;
 
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -47,6 +47,7 @@ public class LMQProducer {
         for (int i = 0; i < 128; i++) {
             try {
                 Message msg = new Message(TOPIC, TAG, ("Hello RocketMQ " + 
i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+                msg.setKeys("Key" + i);
                 msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH 
/* "INNER_MULTI_DISPATCH" */,
                     String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, 
LMQ_TOPIC_1, LMQ_TOPIC_2) /* "%LMQ%123,%LMQ%456" */);
                 SendResult sendResult = producer.send(msg);
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPullConsumer.java 
b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPullConsumer.java
similarity index 98%
rename from 
example/src/main/java/org/apache/rocketmq/example/simple/LMQPullConsumer.java
rename to 
example/src/main/java/org/apache/rocketmq/example/lmq/LMQPullConsumer.java
index 7b1bdc3921..931dd96b48 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPullConsumer.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.example.simple;
+package org.apache.rocketmq.example.lmq;
 
 import java.util.Arrays;
 import java.util.HashSet;
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushConsumer.java 
b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushConsumer.java
similarity index 98%
rename from 
example/src/main/java/org/apache/rocketmq/example/simple/LMQPushConsumer.java
rename to 
example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushConsumer.java
index efe37d8681..f8926a05df 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushConsumer.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.example.simple;
+package org.apache.rocketmq.example.lmq;
 
 import com.google.common.collect.Lists;
 
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushPopConsumer.java
 b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushPopConsumer.java
similarity index 99%
rename from 
example/src/main/java/org/apache/rocketmq/example/simple/LMQPushPopConsumer.java
rename to 
example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushPopConsumer.java
index 2044057b2a..517eb12b7d 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushPopConsumer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushPopConsumer.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.example.simple;
+package org.apache.rocketmq.example.lmq;
 
 import com.google.common.collect.Lists;
 import java.util.HashMap;
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 5be6d24ff7..6ebee1d0dd 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -153,6 +153,12 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, 
end);
     }
 
+
+    public QueryResult queryMessage(String clusterName, String topic, String 
key, int maxNum, long begin, long end)
+        throws MQClientException, InterruptedException, RemotingException {
+        return defaultMQAdminExtImpl.queryMessage(clusterName, topic, key, 
maxNum, begin, end);
+    }
+
     @Override
     public void start() throws MQClientException {
         defaultMQAdminExtImpl.start();
@@ -196,7 +202,8 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
     }
 
     @Override
-    public void createAndUpdateTopicConfigList(String addr, List<TopicConfig> 
topicConfigList) throws InterruptedException, RemotingException, 
MQClientException {
+    public void createAndUpdateTopicConfigList(String addr,
+        List<TopicConfig> topicConfigList) throws InterruptedException, 
RemotingException, MQClientException {
         defaultMQAdminExtImpl.createAndUpdateTopicConfigList(addr, 
topicConfigList);
     }
 
@@ -300,6 +307,12 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         return examineConsumeStats(consumerGroup, null);
     }
 
+    @Override
+    public ConsumeStats examineConsumeStats(String clusterName, String 
consumerGroup,
+        String topic) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
+        return defaultMQAdminExtImpl.examineConsumeStats(clusterName, 
consumerGroup, topic);
+    }
+
     @Override
     public ConsumeStats examineConsumeStats(String consumerGroup,
         String topic) throws RemotingException, MQClientException,
@@ -459,16 +472,35 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         return defaultMQAdminExtImpl.resetOffsetByTimestampOld(consumerGroup, 
topic, timestamp, force);
     }
 
+    public List<RollbackStats> resetOffsetByTimestampOld(String clusterName, 
String consumerGroup, String topic, long timestamp,
+        boolean force)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        return defaultMQAdminExtImpl.resetOffsetByTimestampOld(clusterName, 
consumerGroup, topic, timestamp, force);
+    }
+
+    @Override
+    public Map<MessageQueue, Long> resetOffsetByTimestamp(String clusterName, 
String topic, String group,
+        long timestamp, boolean isForce) throws RemotingException, 
MQBrokerException, InterruptedException, MQClientException {
+        return defaultMQAdminExtImpl.resetOffsetByTimestamp(clusterName, 
topic, group, timestamp, isForce);
+    }
+
     @Override
     public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String 
group, long timestamp, boolean isForce)
         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
     }
 
+    public Map<MessageQueue, Long> resetOffsetByTimestamp(String clusterName, 
String topic, String group,
+        long timestamp, boolean isForce, boolean isC)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        return defaultMQAdminExtImpl.resetOffsetByTimestamp(clusterName, 
topic, group, timestamp, isForce, isC);
+    }
+
+
     public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String 
group, long timestamp, boolean isForce,
         boolean isC)
         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group, 
timestamp, isForce, isC);
+        return defaultMQAdminExtImpl.resetOffsetByTimestamp(null, topic, 
group, timestamp, isForce, isC);
     }
 
     @Override
@@ -589,10 +621,19 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
 
     @Override
     public ConsumeMessageDirectlyResult consumeMessageDirectly(final String 
consumerGroup, final String clientId,
-        final String topic, final String msgId) throws RemotingException, 
MQClientException, InterruptedException, MQBrokerException {
+        final String topic,
+        final String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
         return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, 
clientId, topic, msgId);
     }
 
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(final String 
clusterName, final String consumerGroup,
+        final String clientId,
+        final String topic,
+        final String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
+        return defaultMQAdminExtImpl.consumeMessageDirectly(clusterName, 
consumerGroup, clientId, topic, msgId);
+    }
+
     @Override
     public List<MessageTrack> messageTrackDetail(
         MessageExt msg) throws RemotingException, MQClientException, 
InterruptedException,
@@ -796,10 +837,10 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         this.defaultMQAdminExtImpl.resetMasterFlushOffset(brokerAddr, 
masterFlushOffset);
     }
 
-    public QueryResult queryMessageByUniqKey(String topic, String key, int 
maxNum, long begin, long end)
+    public QueryResult queryMessageByUniqKey(String clusterName, String topic, 
String key, int maxNum, long begin,
+        long end)
         throws MQClientException, InterruptedException {
-
-        return defaultMQAdminExtImpl.queryMessageByUniqKey(topic, key, maxNum, 
begin, end);
+        return defaultMQAdminExtImpl.queryMessageByUniqKey(clusterName, topic, 
key, maxNum, begin, end);
     }
 
     public DefaultMQAdminExtImpl getDefaultMQAdminExtImpl() {
@@ -831,13 +872,14 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
 
     @Override
     public Pair<ElectMasterResponseHeader, BrokerMemberGroup> 
electMaster(String controllerAddr, String clusterName,
-                                                                          
String brokerName, Long brokerId) throws RemotingException, 
InterruptedException, MQBrokerException {
+        String brokerName, Long brokerId) throws RemotingException, 
InterruptedException, MQBrokerException {
         return this.defaultMQAdminExtImpl.electMaster(controllerAddr, 
clusterName, brokerName, brokerId);
     }
 
     @Override
     public void cleanControllerBrokerData(String controllerAddr, String 
clusterName, String brokerName,
-        String brokerControllerIdsToClean, boolean isCleanLivingBroker) throws 
RemotingException, InterruptedException, MQBrokerException {
+        String brokerControllerIdsToClean,
+        boolean isCleanLivingBroker) throws RemotingException, 
InterruptedException, MQBrokerException {
         this.defaultMQAdminExtImpl.cleanControllerBrokerData(controllerAddr, 
clusterName, brokerName, brokerControllerIdsToClean, isCleanLivingBroker);
     }
 
@@ -876,13 +918,15 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
     }
 
     @Override
-    public void createUser(String brokerAddr, String username, String 
password, String userType) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
+    public void createUser(String brokerAddr, String username, String password,
+        String userType) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         defaultMQAdminExtImpl.createUser(brokerAddr, username, password, 
userType);
     }
 
     @Override
     public void updateUser(String brokerAddr, String username,
-        String password, String userType, String userStatus) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+        String password, String userType,
+        String userStatus) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         defaultMQAdminExtImpl.updateUser(brokerAddr, username, password, 
userType, userStatus);
     }
 
@@ -912,38 +956,45 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
 
     @Override
     public void createAcl(String brokerAddr, String subject, List<String> 
resources, List<String> actions,
-        List<String> sourceIps, String decision) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+        List<String> sourceIps,
+        String decision) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         defaultMQAdminExtImpl.createAcl(brokerAddr, subject, resources, 
actions, sourceIps, decision);
     }
 
     @Override
-    public void createAcl(String brokerAddr, AclInfo aclInfo) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+    public void createAcl(String brokerAddr,
+        AclInfo aclInfo) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         defaultMQAdminExtImpl.createAcl(brokerAddr, aclInfo);
     }
 
     @Override
     public void updateAcl(String brokerAddr, String subject, List<String> 
resources, List<String> actions,
-        List<String> sourceIps, String decision) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+        List<String> sourceIps,
+        String decision) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         defaultMQAdminExtImpl.updateAcl(brokerAddr, subject, resources, 
actions, sourceIps, decision);
     }
 
     @Override
-    public void updateAcl(String brokerAddr, AclInfo aclInfo) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+    public void updateAcl(String brokerAddr,
+        AclInfo aclInfo) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         defaultMQAdminExtImpl.updateAcl(brokerAddr, aclInfo);
     }
 
     @Override
-    public void deleteAcl(String brokerAddr, String subject, String resource) 
throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+    public void deleteAcl(String brokerAddr, String subject,
+        String resource) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         defaultMQAdminExtImpl.deleteAcl(brokerAddr, subject, resource);
     }
 
     @Override
-    public AclInfo getAcl(String brokerAddr, String subject) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+    public AclInfo getAcl(String brokerAddr,
+        String subject) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         return defaultMQAdminExtImpl.getAcl(brokerAddr, subject);
     }
 
     @Override
-    public List<AclInfo> listAcl(String brokerAddr, String subjectFilter, 
String resourceFilter) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
+    public List<AclInfo> listAcl(String brokerAddr, String subjectFilter,
+        String resourceFilter) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         return defaultMQAdminExtImpl.listAcl(brokerAddr, subjectFilter, 
resourceFilter);
     }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 9546235d3e..dc4d35e704 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -422,14 +422,21 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         return 
this.mqClientInstance.getMQClientAPIImpl().getBrokerRuntimeInfo(brokerAddr, 
timeoutMillis);
     }
 
+    @Override
+    public ConsumeStats examineConsumeStats(
+        String consumerGroup,
+        String topic) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
+        return examineConsumeStats(null, consumerGroup, topic);
+    }
+
     @Override
     public ConsumeStats examineConsumeStats(
         String consumerGroup) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
-        return examineConsumeStats(consumerGroup, null);
+        return examineConsumeStats(null, consumerGroup, null);
     }
 
     @Override
-    public ConsumeStats examineConsumeStats(String consumerGroup,
+    public ConsumeStats examineConsumeStats(String clusterName, String 
consumerGroup,
         String topic) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
         TopicRouteData topicRouteData = null;
         List<String> routeTopics = new ArrayList<>();
@@ -438,6 +445,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
             routeTopics.add(topic);
             routeTopics.add(KeyBuilder.buildPopRetryTopic(topic, 
consumerGroup));
         }
+
+        // Use clusterName topic to get topic route for lmq or 
rmq_sys_wheel_timer
+        if (!StringUtils.isEmpty(topic) && (MixAll.isLmq(topic) || 
topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer")) && 
!StringUtils.isEmpty(clusterName)) {
+            routeTopics.add(clusterName);
+        }
+
         for (int i = 0; i < routeTopics.size(); i++) {
             try {
                 topicRouteData = 
this.examineTopicRouteInfo(routeTopics.get(i));
@@ -467,25 +480,33 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
             topics.add(messageQueue.getTopic());
         }
 
-        ConsumeStats staticResult = new ConsumeStats();
-        staticResult.setConsumeTps(result.getConsumeTps());
-        // for topic, we put the physical stats, how about group?
-        // staticResult.getOffsetTable().putAll(result.getOffsetTable());
-
-        for (String currentTopic : topics) {
-            TopicRouteData currentRoute = 
this.examineTopicRouteInfo(currentTopic);
-            if (currentRoute.getTopicQueueMappingByBroker() == null
-                || currentRoute.getTopicQueueMappingByBroker().isEmpty()) {
-                //normal topic
-                for (Map.Entry<MessageQueue, OffsetWrapper> entry : 
result.getOffsetTable().entrySet()) {
-                    if (entry.getKey().getTopic().equals(currentTopic)) {
-                        staticResult.getOffsetTable().put(entry.getKey(), 
entry.getValue());
+        ConsumeStats staticResult = null;
+
+        if (StringUtils.isEmpty(clusterName)) {
+
+            staticResult = new ConsumeStats();
+            staticResult.setConsumeTps(result.getConsumeTps());
+            // for topic, we put the physical stats, how about group?
+            // staticResult.getOffsetTable().putAll(result.getOffsetTable());
+
+            for (String currentTopic : topics) {
+                TopicRouteData currentRoute = 
this.examineTopicRouteInfo(currentTopic);
+                if (currentRoute.getTopicQueueMappingByBroker() == null
+                    || currentRoute.getTopicQueueMappingByBroker().isEmpty()) {
+                    //normal topic
+                    for (Map.Entry<MessageQueue, OffsetWrapper> entry : 
result.getOffsetTable().entrySet()) {
+                        if (entry.getKey().getTopic().equals(currentTopic)) {
+                            staticResult.getOffsetTable().put(entry.getKey(), 
entry.getValue());
+                        }
                     }
                 }
+                Map<String, TopicConfigAndQueueMapping> brokerConfigMap = 
MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute, 
defaultMQAdminExt);
+                ConsumeStats consumeStats = 
MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result);
+                
staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable());
             }
-            Map<String, TopicConfigAndQueueMapping> brokerConfigMap = 
MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute, 
defaultMQAdminExt);
-            ConsumeStats consumeStats = 
MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result);
-            
staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable());
+
+        } else {
+            staticResult = result;
         }
 
         if (staticResult.getOffsetTable().isEmpty()) {
@@ -811,10 +832,16 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         
this.mqClientInstance.getMQClientAPIImpl().deleteKVConfigValue(namespace, key, 
timeoutMillis);
     }
 
-    @Override
-    public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, 
String topic, long timestamp,
+    public List<RollbackStats> resetOffsetByTimestampOld(String clusterName, 
String consumerGroup, String topic,
+        long timestamp,
         boolean force) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
-        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+        String routeTopic = topic;
+        // Use clusterName topic to get topic route for lmq or 
rmq_sys_wheel_timer
+        if (!StringUtils.isEmpty(topic) && (MixAll.isLmq(topic) || 
topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"))
+            && !StringUtils.isEmpty(clusterName)) {
+            routeTopic = clusterName;
+        }
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(routeTopic);
         List<RollbackStats> rollbackStatsList = new ArrayList<>();
         Map<String, QueueData> topicRouteMap = new HashMap<>();
         for (QueueData queueData : topicRouteData.getQueueDatas()) {
@@ -829,6 +856,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         return rollbackStatsList;
     }
 
+    @Override
+    public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, 
String topic, long timestamp,
+        boolean force) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
+        return resetOffsetByTimestampOld(null, consumerGroup, topic, 
timestamp, force);
+    }
+
     private List<RollbackStats> resetOffsetByTimestampOld(String brokerAddr, 
QueueData queueData, String consumerGroup,
         String topic, long timestamp,
         boolean force) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
@@ -864,7 +897,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
     @Override
     public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String 
group, long timestamp,
         boolean isForce) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
-        return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
+        return resetOffsetByTimestamp(null, topic, group, timestamp, isForce, 
false);
     }
 
     @Override
@@ -951,9 +984,16 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         });
     }
 
-    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String 
group, long timestamp, boolean isForce,
+    public Map<MessageQueue, Long> resetOffsetByTimestamp(String clusterName, 
String topic, String group,
+        long timestamp, boolean isForce,
         boolean isC) throws RemotingException, MQBrokerException, 
InterruptedException, MQClientException {
-        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+        String routeTopic = topic;
+        // Use clusterName topic to get topic route for lmq or 
rmq_sys_wheel_timer
+        if (!StringUtils.isEmpty(topic) && (MixAll.isLmq(topic) || 
topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"))
+            && !StringUtils.isEmpty(clusterName)) {
+            routeTopic = clusterName;
+        }
+        TopicRouteData topicRouteData = this.examineTopicRouteInfo(routeTopic);
         List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
         Map<MessageQueue, Long> allOffsetTable = new HashMap<>();
         if (brokerDatas != null) {
@@ -1325,7 +1365,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
 
     @Override
     public ConsumeMessageDirectlyResult consumeMessageDirectly(final String 
consumerGroup, final String clientId,
-        final String topic, final String msgId) throws RemotingException, 
MQClientException, InterruptedException, MQBrokerException {
+        final String topic,
+        final String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
         MessageExt msg = this.viewMessage(topic, msgId);
         if 
(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
             return 
this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(NetworkUtil.socketAddress2String(msg.getStoreHost()),
 consumerGroup, clientId, topic, msgId, timeoutMillis);
@@ -1335,6 +1376,20 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
         }
     }
 
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(final String 
clusterName, final String consumerGroup,
+        final String clientId,
+        final String topic,
+        final String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
+        MessageExt msg = this.queryMessage(clusterName, topic, msgId);
+        if 
(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
+            return 
this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(NetworkUtil.socketAddress2String(msg.getStoreHost()),
 consumerGroup, clientId, topic, msgId, timeoutMillis);
+        } else {
+            MessageClientExt msgClient = (MessageClientExt) msg;
+            return 
this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(NetworkUtil.socketAddress2String(msg.getStoreHost()),
 consumerGroup, clientId, topic, msgClient.getOffsetMsgId(), timeoutMillis);
+        }
+    }
+
     @Override
     public List<MessageTrack> messageTrackDetail(
         MessageExt msg) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException {
@@ -1664,10 +1719,10 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
         while (iterator.hasNext()) {
             TopicConfig topicConfig = iterator.next().getValue();
             if (topicList.getTopicList().contains(topicConfig.getTopicName())
-                    || 
TopicValidator.isSystemTopic(topicConfig.getTopicName())) {
+                || TopicValidator.isSystemTopic(topicConfig.getTopicName())) {
                 iterator.remove();
             } else if (!specialTopic && 
StringUtils.startsWithAny(topicConfig.getTopicName(),
-                    MixAll.RETRY_GROUP_TOPIC_PREFIX, 
MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+                MixAll.RETRY_GROUP_TOPIC_PREFIX, 
MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
                 iterator.remove();
             } else if (!PermName.isValid(topicConfig.getPerm())) {
                 iterator.remove();
@@ -1726,6 +1781,11 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
         return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, 
maxNum, begin, end);
     }
 
+    public QueryResult queryMessage(String clusterName, String topic, String 
key, int maxNum, long begin,
+        long end) throws MQClientException, InterruptedException, 
RemotingException {
+        return 
this.mqClientInstance.getMQAdminImpl().queryMessage(clusterName, topic, key, 
maxNum, begin, end, false);
+    }
+
     @Override
     public void updateConsumeOffset(String brokerAddr, String consumeGroup, 
MessageQueue mq,
         long offset) throws RemotingException, InterruptedException, 
MQBrokerException {
@@ -1783,10 +1843,9 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
         return 
this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, topicName, 
queueId, timestamp, timeoutMillis);
     }
 
-    public QueryResult queryMessageByUniqKey(String topic, String key, int 
maxNum, long begin,
+    public QueryResult queryMessageByUniqKey(String clusterName, String topic, 
String key, int maxNum, long begin,
         long end) throws MQClientException, InterruptedException {
-
-        return 
this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, key, 
maxNum, begin, end);
+        return 
this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(clusterName, 
topic, key, maxNum, begin, end);
     }
 
     @Override
@@ -1812,6 +1871,12 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
         }
     }
 
+    @Override
+    public Map<MessageQueue, Long> resetOffsetByTimestamp(String clusterName, 
String topic, String group,
+        long timestamp, boolean isForce) throws RemotingException, 
MQBrokerException, InterruptedException, MQClientException {
+        return resetOffsetByTimestamp(clusterName, topic, group, timestamp, 
isForce, false);
+    }
+
     @Override
     public HARuntimeInfo getBrokerHAStatus(
         String brokerAddr) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, InterruptedException, 
MQBrokerException {
@@ -1844,7 +1909,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
 
     @Override
     public Pair<ElectMasterResponseHeader, BrokerMemberGroup> 
electMaster(String controllerAddr, String clusterName,
-                                                                          
String brokerName, Long brokerId) throws RemotingException, 
InterruptedException, MQBrokerException {
+        String brokerName, Long brokerId) throws RemotingException, 
InterruptedException, MQBrokerException {
         return 
this.mqClientInstance.getMQClientAPIImpl().electMaster(controllerAddr, 
clusterName, brokerName, brokerId);
     }
 
@@ -1930,20 +1995,23 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public void createUser(String brokerAddr, String username, String 
password, String userType) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
+    public void createUser(String brokerAddr, String username, String password,
+        String userType) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         UserInfo userInfo = UserInfo.of(username, password, userType);
         this.createUser(brokerAddr, userInfo);
     }
 
     @Override
     public void updateUser(String brokerAddr, String username,
-        String password, String userType, String userStatus) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+        String password, String userType,
+        String userStatus) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         UserInfo userInfo = UserInfo.of(username, password, userType, 
userStatus);
         this.mqClientInstance.getMQClientAPIImpl().updateUser(brokerAddr, 
userInfo, timeoutMillis);
     }
 
     @Override
-    public void updateUser(String brokerAddr, UserInfo userInfo) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+    public void updateUser(String brokerAddr,
+        UserInfo userInfo) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         this.mqClientInstance.getMQClientAPIImpl().updateUser(brokerAddr, 
userInfo, timeoutMillis);
     }
 
@@ -1967,40 +2035,47 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
 
     @Override
     public void createAcl(String brokerAddr, String subject, List<String> 
resources, List<String> actions,
-        List<String> sourceIps, String decision) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+        List<String> sourceIps,
+        String decision) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         AclInfo aclInfo = AclInfo.of(subject, resources, actions, sourceIps, 
decision);
         this.createAcl(brokerAddr, aclInfo);
     }
 
     @Override
-    public void createAcl(String brokerAddr, AclInfo aclInfo) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+    public void createAcl(String brokerAddr,
+        AclInfo aclInfo) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         this.mqClientInstance.getMQClientAPIImpl().createAcl(brokerAddr, 
aclInfo, timeoutMillis);
     }
 
     @Override
     public void updateAcl(String brokerAddr, String subject, List<String> 
resources, List<String> actions,
-        List<String> sourceIps, String decision) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+        List<String> sourceIps,
+        String decision) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         AclInfo aclInfo = AclInfo.of(subject, resources, actions, sourceIps, 
decision);
         this.updateAcl(brokerAddr, aclInfo);
     }
 
     @Override
-    public void updateAcl(String brokerAddr, AclInfo aclInfo) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+    public void updateAcl(String brokerAddr,
+        AclInfo aclInfo) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         this.mqClientInstance.getMQClientAPIImpl().updateAcl(brokerAddr, 
aclInfo, timeoutMillis);
     }
 
     @Override
-    public void deleteAcl(String brokerAddr, String subject, String resource) 
throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+    public void deleteAcl(String brokerAddr, String subject,
+        String resource) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         this.mqClientInstance.getMQClientAPIImpl().deleteAcl(brokerAddr, 
subject, resource, timeoutMillis);
     }
 
     @Override
-    public AclInfo getAcl(String brokerAddr, String subject) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException, InterruptedException {
+    public AclInfo getAcl(String brokerAddr,
+        String subject) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().getAcl(brokerAddr, 
subject, timeoutMillis);
     }
 
     @Override
-    public List<AclInfo> listAcl(String brokerAddr, String subjectFilter, 
String resourceFilter) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
+    public List<AclInfo> listAcl(String brokerAddr, String subjectFilter,
+        String resourceFilter) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException {
         return this.mqClientInstance.getMQClientAPIImpl().listAcl(brokerAddr, 
subjectFilter, resourceFilter, timeoutMillis);
     }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 9dff3cbab9..ff78f22c70 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -152,6 +152,10 @@ public interface MQAdminExt extends MQAdmin {
         final String topic) throws RemotingException, MQClientException,
         InterruptedException, MQBrokerException;
 
+    ConsumeStats examineConsumeStats(final String clusterName, final String 
consumerGroup,
+        final String topic) throws RemotingException, MQClientException,
+        InterruptedException, MQBrokerException;
+
     ConsumeStats examineConsumeStats(final String brokerAddr, final String 
consumerGroup, final String topicName,
         final long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, MQBrokerException;
@@ -232,6 +236,9 @@ public interface MQAdminExt extends MQAdmin {
     Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, 
long timestamp, boolean isForce)
         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException;
 
+    Map<MessageQueue, Long> resetOffsetByTimestamp(String clusterName, String 
topic, String group, long timestamp, boolean isForce)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException;
+
     void resetOffsetNew(String consumerGroup, String topic, long timestamp) 
throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
 
@@ -293,6 +300,11 @@ public interface MQAdminExt extends MQAdmin {
         String topic,
         String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException;
 
+    ConsumeMessageDirectlyResult consumeMessageDirectly(String clusterName, 
String consumerGroup,
+        String clientId,
+        String topic,
+        String msgId) throws RemotingException, MQClientException, 
InterruptedException, MQBrokerException;
+
     List<MessageTrack> messageTrackDetail(
         MessageExt msg) throws RemotingException, MQClientException, 
InterruptedException,
         MQBrokerException;
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index c489cad684..b638dcf61f 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -72,6 +72,10 @@ public class ConsumerProgressSubCommand implements 
SubCommand {
         optionShowClientIP.setRequired(false);
         options.addOption(optionShowClientIP);
 
+        opt = new Option("c", "cluster", true, "Cluster name or lmq parent 
topic, lmq is used to find the route.");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -109,6 +113,8 @@ public class ConsumerProgressSubCommand implements 
SubCommand {
             boolean showClientIP = commandLine.hasOption('s')
                 && "true".equalsIgnoreCase(commandLine.getOptionValue('s'));
 
+            String clusterName = commandLine.hasOption('c') ? 
commandLine.getOptionValue('c').trim() : null;
+
             if (commandLine.hasOption('g')) {
                 String consumerGroup = commandLine.getOptionValue('g').trim();
                 String topicName = commandLine.hasOption('t') ? 
commandLine.getOptionValue('t').trim() : null;
@@ -116,7 +122,7 @@ public class ConsumerProgressSubCommand implements 
SubCommand {
                 if (topicName == null) {
                     consumeStats = 
defaultMQAdminExt.examineConsumeStats(consumerGroup);
                 } else {
-                    consumeStats = 
defaultMQAdminExt.examineConsumeStats(consumerGroup, topicName);
+                    consumeStats = 
defaultMQAdminExt.examineConsumeStats(clusterName, consumerGroup, topicName);
                 }
                 List<MessageQueue> mqList = new 
LinkedList<>(consumeStats.getOffsetTable().keySet());
                 Collections.sort(mqList);
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
index 5245ca089f..e83029eed3 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
@@ -44,9 +44,10 @@ import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 
 public class QueryMsgByIdSubCommand implements SubCommand {
-    public static void queryById(final DefaultMQAdminExt admin, final String 
topic, final String msgId, final Charset msgBodyCharset) throws 
MQClientException,
+    public static void queryById(final DefaultMQAdminExt admin, final String 
clusterName, final String topic,
+        final String msgId, final Charset msgBodyCharset) throws 
MQClientException,
         RemotingException, MQBrokerException, InterruptedException, 
IOException {
-        MessageExt msg = admin.viewMessage(topic, msgId);
+        MessageExt msg = admin.queryMessage(clusterName, topic, msgId);
 
         printMsg(admin, msg, msgBodyCharset);
     }
@@ -55,7 +56,8 @@ public class QueryMsgByIdSubCommand implements SubCommand {
         printMsg(admin, msg, null);
     }
 
-    public static void printMsg(final DefaultMQAdminExt admin, final 
MessageExt msg, final Charset msgBodyCharset) throws IOException {
+    public static void printMsg(final DefaultMQAdminExt admin, final 
MessageExt msg,
+        final Charset msgBodyCharset) throws IOException {
         if (msg == null) {
             System.out.printf("%nMessage not found!");
             return;
@@ -219,6 +221,10 @@ public class QueryMsgByIdSubCommand implements SubCommand {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("c", "cluster", true, "Cluster name or lmq parent 
topic, lmq is used to find the route.");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -244,13 +250,14 @@ public class QueryMsgByIdSubCommand implements SubCommand 
{
 
             final String msgIds = commandLine.getOptionValue('i').trim();
             final String[] msgIdArr = StringUtils.split(msgIds, ",");
+            String clusterName = commandLine.hasOption('c') ? 
commandLine.getOptionValue('c').trim() : null;
 
             if (commandLine.hasOption('g') && commandLine.hasOption('d')) {
                 final String consumerGroup = 
commandLine.getOptionValue('g').trim();
                 final String clientId = commandLine.getOptionValue('d').trim();
                 for (String msgId : msgIdArr) {
                     if (StringUtils.isNotBlank(msgId)) {
-                        pushMsg(defaultMQAdminExt, consumerGroup, clientId, 
topic, msgId.trim());
+                        pushMsg(defaultMQAdminExt, clusterName, consumerGroup, 
clientId, topic, msgId.trim());
                     }
                 }
             } else if (commandLine.hasOption('s')) {
@@ -258,7 +265,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
                 if (resend) {
                     for (String msgId : msgIdArr) {
                         if (StringUtils.isNotBlank(msgId)) {
-                            sendMsg(defaultMQAdminExt, defaultMQProducer, 
topic, msgId.trim());
+                            sendMsg(defaultMQAdminExt, clusterName, 
defaultMQProducer, topic, msgId.trim());
                         }
                     }
                 }
@@ -269,7 +276,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
                 }
                 for (String msgId : msgIdArr) {
                     if (StringUtils.isNotBlank(msgId)) {
-                        queryById(defaultMQAdminExt, topic, msgId.trim(), 
msgBodyCharset);
+                        queryById(defaultMQAdminExt, clusterName, topic, 
msgId.trim(), msgBodyCharset);
                     }
                 }
 
@@ -282,13 +289,14 @@ public class QueryMsgByIdSubCommand implements SubCommand 
{
         }
     }
 
-    private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final 
String consumerGroup, final String clientId,
+    private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final 
String clusterName,
+        final String consumerGroup, final String clientId,
         final String topic, final String msgId) {
         try {
             ConsumerRunningInfo consumerRunningInfo = 
defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false, false);
             if (consumerRunningInfo != null && 
ConsumerRunningInfo.isPushType(consumerRunningInfo)) {
                 ConsumeMessageDirectlyResult result =
-                        
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+                    defaultMQAdminExt.consumeMessageDirectly(clusterName, 
consumerGroup, clientId, topic, msgId);
                 System.out.printf("%s", result);
             } else {
                 System.out.printf("this %s client is not push consumer ,not 
support direct push \n", clientId);
@@ -298,10 +306,11 @@ public class QueryMsgByIdSubCommand implements SubCommand 
{
         }
     }
 
-    private void sendMsg(final DefaultMQAdminExt defaultMQAdminExt, final 
DefaultMQProducer defaultMQProducer,
+    private void sendMsg(final DefaultMQAdminExt defaultMQAdminExt, final 
String clusterName,
+        final DefaultMQProducer defaultMQProducer,
         final String topic, final String msgId) {
         try {
-            MessageExt msg = defaultMQAdminExt.viewMessage(topic, msgId);
+            MessageExt msg = defaultMQAdminExt.queryMessage(clusterName, 
topic, msgId);
             if (msg != null) {
                 // resend msg by id
                 System.out.printf("prepare resend msg. originalMsgId=%s", 
msgId);
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
index 64627fd19f..02961c3bb5 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
@@ -23,6 +23,8 @@ import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
@@ -41,7 +43,7 @@ public class QueryMsgByKeySubCommand implements SubCommand {
 
     @Override
     public Options buildCommandlineOptions(Options options) {
-        Option opt = new Option("t", "topic", true, "topic name");
+        Option opt = new Option("t", "topic", true, "Topic name");
         opt.setRequired(true);
         options.addOption(opt);
 
@@ -57,7 +59,11 @@ public class QueryMsgByKeySubCommand implements SubCommand {
         opt.setRequired(false);
         options.addOption(opt);
 
-        opt = new Option("c", "maxNum", true, "The maximum number of messages 
returned by the query, default:64");
+        opt = new Option("m", "maxNum", true, "The maximum number of messages 
returned by the query, default:64");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "cluster", true, "Cluster name or lmq parent 
topic, lmq is used to find the route.");
         opt.setRequired(false);
         options.addOption(opt);
 
@@ -77,16 +83,20 @@ public class QueryMsgByKeySubCommand implements SubCommand {
             long beginTimestamp = 0;
             long endTimestamp = Long.MAX_VALUE;
             int maxNum = 64;
+            String clusterName = null;
             if (commandLine.hasOption("b")) {
                 beginTimestamp = 
Long.parseLong(commandLine.getOptionValue("b").trim());
             }
             if (commandLine.hasOption("e")) {
                 endTimestamp = 
Long.parseLong(commandLine.getOptionValue("e").trim());
             }
+            if (commandLine.hasOption("m")) {
+                maxNum = 
Integer.parseInt(commandLine.getOptionValue("m").trim());
+            }
             if (commandLine.hasOption("c")) {
-                maxNum = 
Integer.parseInt(commandLine.getOptionValue("c").trim());
+                clusterName = commandLine.getOptionValue("c").trim();
             }
-            this.queryByKey(defaultMQAdminExt, topic, key, maxNum, 
beginTimestamp, endTimestamp);
+            this.queryByKey(defaultMQAdminExt, clusterName, topic, key, 
maxNum, beginTimestamp, endTimestamp);
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
         } finally {
@@ -94,12 +104,13 @@ public class QueryMsgByKeySubCommand implements SubCommand 
{
         }
     }
 
-    private void queryByKey(final DefaultMQAdminExt admin, final String topic, 
final String key, int maxNum, long begin,
+    private void queryByKey(final DefaultMQAdminExt admin, final String 
cluster, final String topic, final String key, int maxNum, long begin,
         long end)
-        throws MQClientException, InterruptedException {
+        throws MQClientException, InterruptedException, RemotingException {
         admin.start();
 
-        QueryResult queryResult = admin.queryMessage(topic, key, maxNum, 
begin, end);
+        QueryResult queryResult = admin.queryMessage(cluster, topic, key, 
maxNum, begin, end);
+
         System.out.printf("%-50s %4s %40s%n",
             "#Message ID",
             "#QID",
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
index b71cee9016..5295d91cc3 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
@@ -25,13 +25,11 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.rocketmq.client.QueryResult;
-import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -51,19 +49,18 @@ public class QueryMsgByUniqueKeySubCommand implements 
SubCommand {
             
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
             try {
                 defaultMQAdminExt.start();
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 throw new SubCommandException(this.getClass().getSimpleName() 
+ " command failed", e);
             }
             return defaultMQAdminExt;
         }
     }
 
-    public static void queryById(final DefaultMQAdminExt admin, final String 
topic, final String msgId,
-                                 final boolean showAll) throws 
MQClientException,
-            RemotingException, MQBrokerException, InterruptedException, 
IOException {
+    public static void queryById(final DefaultMQAdminExt admin, final String 
clusterName, final String topic,
+        final String msgId,
+        final boolean showAll) throws MQClientException, InterruptedException, 
IOException {
 
-        QueryResult queryResult = admin.queryMessageByUniqKey(topic, msgId, 
32, 0, Long.MAX_VALUE);
+        QueryResult queryResult = admin.queryMessageByUniqKey(clusterName, 
topic, msgId, 32, 0, Long.MAX_VALUE);
         assert queryResult != null;
         List<MessageExt> list = queryResult.getMessageList();
         if (list == null || list.size() == 0) {
@@ -94,7 +91,7 @@ public class QueryMsgByUniqueKeySubCommand implements 
SubCommand {
         System.out.printf(strFormat, "Store Host:", 
RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()));
         System.out.printf(intFormat, "System Flag:", msg.getSysFlag());
         System.out.printf(strFormat, "Properties:",
-                msg.getProperties() != null ? msg.getProperties().toString() : 
"");
+            msg.getProperties() != null ? msg.getProperties().toString() : "");
         System.out.printf(strFormat, "Message Body Path:", bodyTmpFilePath);
 
         try {
@@ -166,6 +163,10 @@ public class QueryMsgByUniqueKeySubCommand implements 
SubCommand {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("c", "cluster", true, "Cluster name or lmq parent 
topic, lmq is used to find the route.");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -173,10 +174,11 @@ public class QueryMsgByUniqueKeySubCommand implements 
SubCommand {
     public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
 
         try {
-            defaultMQAdminExt =  createMQAdminExt(rpcHook);
+            defaultMQAdminExt = createMQAdminExt(rpcHook);
 
             final String msgId = commandLine.getOptionValue('i').trim();
             final String topic = commandLine.getOptionValue('t').trim();
+            String clusterName = commandLine.hasOption('c') ? 
commandLine.getOptionValue('c').trim() : null;
             final boolean showAll = commandLine.hasOption('a');
             if (commandLine.hasOption('g') && commandLine.hasOption('d')) {
                 final String consumerGroup = 
commandLine.getOptionValue('g').trim();
@@ -189,14 +191,14 @@ public class QueryMsgByUniqueKeySubCommand implements 
SubCommand {
                 }
                 if (consumerRunningInfo != null && 
ConsumerRunningInfo.isPushType(consumerRunningInfo)) {
                     ConsumeMessageDirectlyResult result =
-                            
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+                        
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
                     System.out.printf("%s", result);
                 } else {
-                    System.out.printf("get consumer info failed or this %s 
client is not push consumer ,not support direct push \n", clientId);
+                    System.out.printf("get consumer info failed or this %s 
client is not push consumer, not support direct push \n", clientId);
                 }
 
             } else {
-                queryById(defaultMQAdminExt, topic, msgId, showAll);
+                queryById(defaultMQAdminExt, clusterName, topic, msgId, 
showAll);
             }
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
index 993fa50187..84a301bd60 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
@@ -77,6 +77,10 @@ public class ResetOffsetByTimeCommand implements SubCommand {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option("c", "cluster", true, "Cluster name or lmq parent 
topic, lmq is used to find the route.");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -88,6 +92,7 @@ public class ResetOffsetByTimeCommand implements SubCommand {
             String group = commandLine.getOptionValue("g").trim();
             String topic = commandLine.getOptionValue("t").trim();
             String timeStampStr = commandLine.getOptionValue("s").trim();
+            String clusterName = commandLine.hasOption('c') ? 
commandLine.getOptionValue('c').trim() : null;
             long timestamp = "now".equals(timeStampStr) ? 
System.currentTimeMillis() : 0;
 
             try {
@@ -129,7 +134,7 @@ public class ResetOffsetByTimeCommand implements SubCommand 
{
             if (brokerAddr != null && queueId >= 0) {
                 System.out.printf("start reset consumer offset by specified, " 
+
                         "group[%s], topic[%s], queueId[%s], broker[%s], 
timestamp(string)[%s], timestamp(long)[%s]%n",
-                        group, topic, queueId, brokerAddr, timeStampStr, 
timestamp);
+                    group, topic, queueId, brokerAddr, timeStampStr, 
timestamp);
 
                 long resetOffset = null != offset ? offset :
                     defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId, 
timestamp, 3000);
@@ -143,11 +148,11 @@ public class ResetOffsetByTimeCommand implements 
SubCommand {
 
             Map<MessageQueue, Long> offsetTable;
             try {
-                offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, 
group, timestamp, force, isC);
+                offsetTable = 
defaultMQAdminExt.resetOffsetByTimestamp(clusterName, topic, group, timestamp, 
force, isC);
             } catch (MQClientException e) {
-                // if consumer not online, use old command to reset reset
+                // if consumer not online, use old command to reset
                 if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
-                    ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt, 
group, topic, timestamp, force, timeStampStr);
+                    ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt, 
clusterName, group, topic, timestamp, force, timeStampStr);
                     return;
                 }
                 throw e;
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
index 7984bb8c39..c179c5c805 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
@@ -34,12 +34,13 @@ import 
org.apache.rocketmq.tools.command.SubCommandException;
 
 public class ResetOffsetByTimeOldCommand implements SubCommand {
 
-    public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String 
consumerGroup, String topic,
+    public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String 
clusterName, String consumerGroup,
+        String topic,
         long timestamp, boolean force, String timeStampStr)
         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
 
         List<RollbackStats> rollbackStatsList =
-            defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, 
timestamp, force);
+            defaultMQAdminExt.resetOffsetByTimestampOld(clusterName, 
consumerGroup, topic, timestamp, force);
 
         System.out.printf("reset consumer offset by specified " +
                 "consumerGroup[%s], topic[%s], force[%s], 
timestamp(string)[%s], timestamp(long)[%s]%n",
@@ -93,6 +94,11 @@ public class ResetOffsetByTimeOldCommand implements 
SubCommand {
         opt = new Option("f", "force", true, "set the force rollback by 
timestamp switch[true|false]");
         opt.setRequired(false);
         options.addOption(opt);
+
+        opt = new Option("c", "cluster", true, "Cluster name or lmq parent 
topic, lmq is used to find the route.");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -104,6 +110,7 @@ public class ResetOffsetByTimeOldCommand implements 
SubCommand {
             String consumerGroup = commandLine.getOptionValue("g").trim();
             String topic = commandLine.getOptionValue("t").trim();
             String timeStampStr = commandLine.getOptionValue("s").trim();
+            String clusterName = commandLine.hasOption('c') ? 
commandLine.getOptionValue('c').trim() : null;
             long timestamp = 0;
             try {
                 timestamp = Long.parseLong(timeStampStr);
@@ -123,7 +130,7 @@ public class ResetOffsetByTimeOldCommand implements 
SubCommand {
                 force = 
Boolean.parseBoolean(commandLine.getOptionValue("f").trim());
             }
             defaultMQAdminExt.start();
-            resetOffset(defaultMQAdminExt, consumerGroup, topic, timestamp, 
force, timeStampStr);
+            resetOffset(defaultMQAdminExt, clusterName, consumerGroup, topic, 
timestamp, force, timeStampStr);
 
         } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java
index b22491a591..8f2ac2e1e1 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java
@@ -57,6 +57,10 @@ public class SkipAccumulationSubCommand implements 
SubCommand {
         opt = new Option("f", "force", true, "set the force rollback by 
timestamp switch[true|false]");
         opt.setRequired(false);
         options.addOption(opt);
+
+        opt = new Option("c", "cluster", true, "Cluster name or lmq parent 
topic, lmq is used to find the route.");
+        opt.setRequired(false);
+        options.addOption(opt);
         return options;
     }
 
@@ -68,6 +72,7 @@ public class SkipAccumulationSubCommand implements SubCommand 
{
         try {
             String group = commandLine.getOptionValue("g").trim();
             String topic = commandLine.getOptionValue("t").trim();
+            String clusterName = commandLine.hasOption('c') ? 
commandLine.getOptionValue('c').trim() : null;
             boolean force = true;
             if (commandLine.hasOption('f')) {
                 force = 
Boolean.valueOf(commandLine.getOptionValue("f").trim());
@@ -76,7 +81,7 @@ public class SkipAccumulationSubCommand implements SubCommand 
{
             defaultMQAdminExt.start();
             Map<MessageQueue, Long> offsetTable;
             try {
-                offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, 
group, timestamp, force);
+                offsetTable = 
defaultMQAdminExt.resetOffsetByTimestamp(clusterName, topic, group, timestamp, 
force);
             } catch (MQClientException e) {
                 if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
                     List<RollbackStats> rollbackStatsList = 
defaultMQAdminExt.resetOffsetByTimestampOld(group, topic, timestamp, force);
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
index a1619ecedf..47ca761d1f 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
@@ -27,6 +27,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
 import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
@@ -48,6 +50,10 @@ public class TopicStatusSubCommand implements SubCommand {
         Option opt = new Option("t", "topic", true, "topic name");
         opt.setRequired(true);
         options.addOption(opt);
+
+        opt = new Option("c", "cluster", true, "cluster name or lmq parent 
topic, lmq is used to find the route.");
+        opt.setRequired(false);
+        options.addOption(opt);
         return options;
     }
 
@@ -58,10 +64,26 @@ public class TopicStatusSubCommand implements SubCommand {
 
         
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
 
+
         try {
+            TopicStatsTable topicStatsTable = new TopicStatsTable();
             defaultMQAdminExt.start();
             String topic = commandLine.getOptionValue('t').trim();
-            TopicStatsTable topicStatsTable = 
defaultMQAdminExt.examineTopicStats(topic);
+
+            if (commandLine.hasOption('c')) {
+                String cluster = commandLine.getOptionValue('c').trim();
+                TopicRouteData topicRouteData = 
defaultMQAdminExt.examineTopicRouteInfo(cluster);
+
+                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+                    String addr = bd.selectBrokerAddr();
+                    if (addr != null) {
+                        TopicStatsTable tst = 
defaultMQAdminExt.examineTopicStats(addr, topic);
+                        
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
+                    }
+                }
+            } else {
+                topicStatsTable = defaultMQAdminExt.examineTopicStats(topic);
+            }
 
             List<MessageQueue> mqList = new LinkedList<>();
             mqList.addAll(topicStatsTable.getOffsetTable().keySet());
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
index fc5405e747..b24bd22db8 100644
--- 
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
@@ -127,7 +127,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
         when(mQAdminImpl.queryMessageByUniqKey(anyString(), 
anyString())).thenReturn(retMsgExt);
 
         QueryResult queryResult = new QueryResult(0, 
Lists.newArrayList(retMsgExt));
-        when(defaultMQAdminExtImpl.queryMessageByUniqKey(anyString(), 
anyString(), anyInt(), anyLong(), anyLong())).thenReturn(queryResult);
+        when(mQAdminImpl.queryMessageByUniqKey(anyString(), anyString(), 
anyString(), anyInt(), anyLong(), anyLong())).thenReturn(queryResult);
 
         TopicRouteData topicRouteData = new TopicRouteData();
         List<BrokerData> brokerDataList = new ArrayList<>();
@@ -194,7 +194,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
 
         Options options = ServerUtil.buildCommandlineOptions(new Options());
 
-        String[] args = new String[] {"-t myTopicTest", "-i msgId"};
+        String[] args = new String[] {"-t myTopicTest", "-i msgId", "-c 
DefaultCluster"};
         CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args,
             cmd.buildCommandlineOptions(options), new DefaultParser());
         cmd.execute(commandLine, options, null);
@@ -218,7 +218,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
 
         Options options = ServerUtil.buildCommandlineOptions(new Options());
 
-        String[] args = new String[] {"-t myTopicTest", "-i 
7F000001000004D20000000000000066"};
+        String[] args = new String[] {"-t myTopicTest", "-i 
7F000001000004D20000000000000066", "-c DefaultCluster"};
         CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args,
             cmd.buildCommandlineOptions(options), new DefaultParser());
         cmd.execute(commandLine, options, null);
@@ -230,7 +230,7 @@ public class QueryMsgByUniqueKeySubCommandTest {
 
         Options options = ServerUtil.buildCommandlineOptions(new Options());
 
-        String[] args = new String[] {"-t myTopicTest", "-i 
0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"};
+        String[] args = new String[] {"-t myTopicTest", "-i 
0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId", "-c 
DefaultCluster"};
         CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args,
             cmd.buildCommandlineOptions(options), new DefaultParser());
         cmd.execute(commandLine, options, null);
@@ -241,13 +241,13 @@ public class QueryMsgByUniqueKeySubCommandTest {
 
         System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
 
-        String[] args = new String[]{"-t myTopicTest", "-i 
0A3A54F7BF7D18B4AAC28A3FA2CF0000"};
+        String[] args = new String[]{"-t myTopicTest", "-i 
0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-c DefaultCluster"};
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args,
             cmd.buildCommandlineOptions(options), new DefaultParser());
         cmd.execute(commandLine, options, null);
 
-        args = new String[] {"-t myTopicTest", "-i 
0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"};
+        args = new String[] {"-t myTopicTest", "-i 
0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId", "-c 
DefaultCluster"};
         commandLine = ServerUtil.parseCmdLine("mqadmin ", args, 
cmd.buildCommandlineOptions(options),
             new DefaultParser());
         cmd.execute(commandLine, options, null);

Reply via email to