This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git
The following commit(s) were added to refs/heads/master by this push: new d58e13d Proxy Support And ConsumerGroup Enhancement (#207) d58e13d is described below commit d58e13da95b992cb3d6b05de1b1fe0ee80cb1e87 Author: Akai <91858554+1294566...@users.noreply.github.com> AuthorDate: Wed Jun 12 09:12:19 2024 +0800 Proxy Support And ConsumerGroup Enhancement (#207) * Support dashboard v4-v5 switch And query for v5 topic * Modify tag name * Support proxy-module And Fix the problem of showing wrong consumerGroup-info --------- Co-authored-by: yuanziwei <yuanzi...@xiaomi.com> --- .../rocketmq/dashboard/config/RMQConfigure.java | 23 +++++ .../dashboard/controller/ConsumerController.java | 16 ++-- .../dashboard/controller/ProxyController.java | 54 ++++++++++++ .../rocketmq/dashboard/model/GroupConsumeInfo.java | 27 ++++-- .../dashboard/service/ConsumerService.java | 8 +- .../rocketmq/dashboard/service/ProxyService.java | 28 +++++++ .../dashboard/service/client/MQAdminExtImpl.java | 5 +- .../dashboard/service/client/ProxyAdmin.java | 28 +++++++ .../dashboard/service/client/ProxyAdminImpl.java | 60 +++++++++++++ .../service/impl/ConsumerServiceImpl.java | 67 +++++++++++---- .../dashboard/service/impl/ProxyServiceImpl.java | 59 +++++++++++++ .../rocketmq/dashboard/task/MonitorTask.java | 2 +- src/main/resources/application.yml | 3 + src/main/resources/static/index.html | 1 + src/main/resources/static/src/app.js | 3 + src/main/resources/static/src/consumer.js | 9 +- src/main/resources/static/src/i18n/en.js | 1 + src/main/resources/static/src/i18n/zh.js | 1 + src/main/resources/static/src/proxy.js | 97 ++++++++++++++++++++++ src/main/resources/static/view/layout/_header.html | 1 + src/main/resources/static/view/pages/consumer.html | 4 +- src/main/resources/static/view/pages/proxy.html | 67 +++++++++++++++ 22 files changed, 516 insertions(+), 48 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java index 991a2d8..5ce21ff 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java +++ b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java @@ -43,6 +43,8 @@ public class RMQConfigure { //use rocketmq.namesrv.addr first,if it is empty,than use system proerty or system env private volatile String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); + private volatile String proxyAddr; + private volatile String isVIPChannel = System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"); @@ -62,6 +64,8 @@ public class RMQConfigure { private List<String> namesrvAddrs = new ArrayList<>(); + private List<String> proxyAddrs = new ArrayList<>(); + public String getAccessKey() { return accessKey; } @@ -86,6 +90,25 @@ public class RMQConfigure { return namesrvAddrs; } + public List<String> getProxyAddrs() { + return this.proxyAddrs; + } + + public void setProxyAddrs(List<String> proxyAddrs) { + this.proxyAddrs = proxyAddrs; + if (CollectionUtils.isNotEmpty(proxyAddrs)) { + this.setProxyAddr(proxyAddrs.get(0)); + } + } + + public String getProxyAddr() { + return proxyAddr; + } + + public void setProxyAddr(String proxyAddr) { + this.proxyAddr = proxyAddr; + } + public void setNamesrvAddrs(List<String> namesrvAddrs) { this.namesrvAddrs = namesrvAddrs; if (CollectionUtils.isNotEmpty(namesrvAddrs)) { diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java index d9f22e4..96fc056 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java @@ -47,14 +47,14 @@ public class ConsumerController { @RequestMapping(value = "/groupList.query") @ResponseBody - public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup) { - return consumerService.queryGroupList(skipSysGroup); + public Object list(@RequestParam(value = "skipSysGroup", required = false) boolean skipSysGroup, String address) { + return consumerService.queryGroupList(skipSysGroup, address); } @RequestMapping(value = "/group.query") @ResponseBody - public Object groupQuery(@RequestParam String consumerGroup) { - return consumerService.queryGroup(consumerGroup); + public Object groupQuery(@RequestParam String consumerGroup, String address) { + return consumerService.queryGroup(consumerGroup, address); } @RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST}) @@ -99,14 +99,14 @@ public class ConsumerController { @RequestMapping(value = "/queryTopicByConsumer.query") @ResponseBody - public Object queryConsumerByTopic(@RequestParam String consumerGroup) { - return consumerService.queryConsumeStatsListByGroupName(consumerGroup); + public Object queryConsumerByTopic(@RequestParam String consumerGroup, String address) { + return consumerService.queryConsumeStatsListByGroupName(consumerGroup, address); } @RequestMapping(value = "/consumerConnection.query") @ResponseBody - public Object consumerConnection(@RequestParam(required = false) String consumerGroup) { - ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup); + public Object consumerConnection(@RequestParam(required = false) String consumerGroup, String address) { + ConsumerConnection consumerConnection = consumerService.getConsumerConnection(consumerGroup, address); consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet())); return consumerConnection; } diff --git a/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java new file mode 100644 index 0000000..27aa59d --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.controller; + +import org.apache.rocketmq.dashboard.permisssion.Permission; +import org.apache.rocketmq.dashboard.service.ProxyService; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; + +import javax.annotation.Resource; + +@Controller +@RequestMapping("/proxy") +@Permission +public class ProxyController { + @Resource + private ProxyService proxyService; + @RequestMapping(value = "/homePage.query", method = RequestMethod.GET) + @ResponseBody + public Object homePage() { + return proxyService.getProxyHomePage(); + } + + @RequestMapping(value = "/addProxyAddr.do", method = RequestMethod.POST) + @ResponseBody + public Object addProxyAddr(@RequestParam String newProxyAddr) { + proxyService.addProxyAddrList(newProxyAddr); + return true; + } + + @RequestMapping(value = "/updateProxyAddr.do", method = RequestMethod.POST) + @ResponseBody + public Object updateProxyAddr(@RequestParam String proxyAddr) { + proxyService.updateProxyAddrList(proxyAddr); + return true; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java index 0d19af9..db11c41 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java +++ b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java @@ -19,12 +19,15 @@ package org.apache.rocketmq.dashboard.model; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import java.util.List; + public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> { private String group; private String version; private int count; private ConsumeType consumeType; private MessageModel messageModel; + private List<String> address; private int consumeTps; private long diffTotal = -1; private String subGroupType = "NORMAL"; @@ -70,6 +73,22 @@ public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> { this.diffTotal = diffTotal; } + public List<String> getAddress() { + return address; + } + + public void setAddress(List<String> address) { + this.address = address; + } + + public String getSubGroupType() { + return subGroupType; + } + + public void setSubGroupType(String subGroupType) { + this.subGroupType = subGroupType; + } + @Override public int compareTo(GroupConsumeInfo o) { if (this.count != o.count) { @@ -93,12 +112,4 @@ public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> { public void setVersion(String version) { this.version = version; } - - public String getSubGroupType() { - return subGroupType; - } - - public void setSubGroupType(String subGroupType) { - this.subGroupType = subGroupType; - } } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java index c475931..e284c44 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java @@ -31,12 +31,12 @@ import java.util.Map; import java.util.Set; public interface ConsumerService { - List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup); + List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup,String address); - GroupConsumeInfo queryGroup(String consumerGroup); + GroupConsumeInfo queryGroup(String consumerGroup, String address); - List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName); + List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address); List<TopicConsumerInfo> queryConsumeStatsList(String topic, String groupName); @@ -52,7 +52,7 @@ public interface ConsumerService { Set<String> fetchBrokerNameSetBySubscriptionGroup(String group); - ConsumerConnection getConsumerConnection(String consumerGroup); + ConsumerConnection getConsumerConnection(String consumerGroup, String address); ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java new file mode 100644 index 0000000..2a64680 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service; + +import java.util.Map; + +public interface ProxyService { + + void addProxyAddrList(String proxyAddr); + + void updateProxyAddrList(String proxyAddr); + + Map<String, Object> getProxyHomePage(); +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java index 360c0e3..0281c5c 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java @@ -627,7 +627,7 @@ public class MQAdminExtImpl implements MQAdminExt { long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'examineConsumeStats'"); + return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(brokerAddr, consumerGroup, topicName, timeoutMillis); } @Override @@ -639,8 +639,7 @@ public class MQAdminExtImpl implements MQAdminExt { @Override public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup, String brokerAddr) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'examineConsumerConnectionInfo'"); + return MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup, brokerAddr); } @Override diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java new file mode 100644 index 0000000..4344c7c --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service.client; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; + +public interface ProxyAdmin { + + ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException; +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java new file mode 100644 index 0000000..eadae12 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service.client; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.tools.admin.MQAdminExt; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import static org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CONNECTION_LIST; + +@Slf4j +@Service +public class ProxyAdminImpl implements ProxyAdmin { + @Autowired + private GenericObjectPool<MQAdminExt> mqAdminExtPool; + + @Override + public ConsumerConnection examineConsumerConnectionInfo(String addr, String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + try { + MQAdminInstance.createMQAdmin(mqAdminExtPool); + RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient(); + GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + RemotingCommand request = RemotingCommand.createRequestCommand(GET_CONSUMER_CONNECTION_LIST, requestHeader); + RemotingCommand response = remotingClient.invokeSync(addr, request, 3000); + switch (response.getCode()) { + case 0: + return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class); + default: + throw new MQBrokerException(response.getCode(), response.getRemark(), addr); + } + } finally { + MQAdminInstance.returnMQAdmin(mqAdminExtPool); + } + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java index a1cf9ff..9bc37ab 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java @@ -23,8 +23,10 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -44,6 +46,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.dashboard.service.client.ProxyAdmin; import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; import org.apache.rocketmq.remoting.protocol.admin.RollbackStats; import org.apache.rocketmq.common.message.MessageQueue; @@ -77,6 +80,8 @@ import org.springframework.stereotype.Service; public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService, InitializingBean, DisposableBean { private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class); + @Resource + protected ProxyAdmin proxyAdmin; @Resource private RMQConfigure configure; @@ -119,25 +124,33 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup) { - Set<String> consumerGroupSet = Sets.newHashSet(); + public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup, String address) { + HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap(); try { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); - consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()); + for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) { + if (!consumerGroupMap.containsKey(groupName)) { + consumerGroupMap.putIfAbsent(groupName, new ArrayList<>()); + } + List<String> addresses = consumerGroupMap.get(groupName); + addresses.add(brokerData.selectBrokerAddr()); + consumerGroupMap.put(groupName, addresses); + } } - } - catch (Exception err) { + } catch (Exception err) { Throwables.throwIfUnchecked(err); throw new RuntimeException(err); } List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList()); - CountDownLatch countDownLatch = new CountDownLatch(consumerGroupSet.size()); - for (String consumerGroup : consumerGroupSet) { + CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size()); + for (Map.Entry<String, List<String>> entry : consumerGroupMap.entrySet()) { + String consumerGroup = entry.getKey(); executorService.submit(() -> { try { - GroupConsumeInfo consumeInfo = queryGroup(consumerGroup); + GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, address); + consumeInfo.setAddress(entry.getValue()); groupConsumeInfoList.add(consumeInfo); } catch (Exception e) { logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e); @@ -165,7 +178,7 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - public GroupConsumeInfo queryGroup(String consumerGroup) { + public GroupConsumeInfo queryGroup(String consumerGroup, String address) { GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); try { ConsumeStats consumeStats = null; @@ -182,9 +195,12 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum .allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly); try { - consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); - } - catch (Exception e) { + if (StringUtils.isNotEmpty(address)) { + consumerConnection = proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup); + } else { + consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); + } + } catch (Exception e) { logger.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage()); } @@ -217,8 +233,18 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName) { - return queryConsumeStatsList(null, groupName); + public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName, String address) { + ConsumeStats consumeStats; + String topic = null; + try { + String[] addresses = address.split(","); + String addr = addresses[0]; + consumeStats = mqAdminExt.examineConsumeStats(addr, groupName, null, 3000); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + return toTopicConsumerInfoList(topic, consumeStats, groupName); } @Override @@ -231,6 +257,10 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } + return toTopicConsumerInfoList(topic, consumeStats, groupName); + } + + private List<TopicConsumerInfo> toTopicConsumerInfoList(String topic, ConsumeStats consumeStats, String groupName) { List<MessageQueue> mqList = Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() { @Override public boolean apply(MessageQueue o) { @@ -431,11 +461,12 @@ public class ConsumerServiceImpl extends AbstractCommonService implements Consum } @Override - public ConsumerConnection getConsumerConnection(String consumerGroup) { + public ConsumerConnection getConsumerConnection(String consumerGroup, String address) { try { - return mqAdminExt.examineConsumerConnectionInfo(consumerGroup); - } - catch (Exception e) { + String[] addresses = address.split(","); + String addr = addresses[0]; + return mqAdminExt.examineConsumerConnectionInfo(consumerGroup, addr); + } catch (Exception e) { Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } diff --git a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java new file mode 100644 index 0000000..07e63b3 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.dashboard.service.impl; + +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.dashboard.config.RMQConfigure; +import org.apache.rocketmq.dashboard.service.ProxyService; +import org.apache.rocketmq.dashboard.service.client.ProxyAdmin; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; + +@Slf4j +@Service +public class ProxyServiceImpl implements ProxyService { + @Resource + protected ProxyAdmin proxyAdmin; + @Resource + private RMQConfigure configure; + + @Override + public void addProxyAddrList(String proxyAddr) { + List<String> proxyAddrs = configure.getProxyAddrs(); + if (proxyAddrs != null && !proxyAddrs.contains(proxyAddr)) { + proxyAddrs.add(proxyAddr); + } + configure.setProxyAddrs(proxyAddrs); + } + + @Override + public void updateProxyAddrList(String proxyAddr) { + configure.setProxyAddr(proxyAddr); + } + + @Override + public Map<String, Object> getProxyHomePage() { + Map<String, Object> homePageInfoMap = Maps.newHashMap(); + homePageInfoMap.put("currentProxyAddr", configure.getProxyAddr()); + homePageInfoMap.put("proxyAddrList", configure.getProxyAddrs()); + return homePageInfoMap; + } +} diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java index 710929b..3c8a77e 100644 --- a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java +++ b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java @@ -40,7 +40,7 @@ public class MonitorTask { // @Scheduled(cron = "* * * * * ?") public void scanProblemConsumeGroup() { for (Map.Entry<String, ConsumerMonitorConfig> configEntry : monitorService.queryConsumerMonitorConfig().entrySet()) { - GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey()); + GroupConsumeInfo consumeInfo = consumerService.queryGroup(configEntry.getKey(), null); if (consumeInfo.getCount() < configEntry.getValue().getMinCount() || consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) { logger.info("op=look consumeInfo {}", JsonUtil.obj2String(consumeInfo)); // notify the alert system } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 090e421..fe4d283 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -59,6 +59,9 @@ rocketmq: # must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required loginRequired: false useTLS: false + proxyAddr: 127.0.0.1:8080 + proxyAddrs: + - 127.0.0.1:8080 # set the accessKey and secretKey if you used acl # accessKey: rocketmq2 # secretKey: 12345678 diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html index c2bf349..ee3c3fe 100644 --- a/src/main/resources/static/index.html +++ b/src/main/resources/static/index.html @@ -104,6 +104,7 @@ <script type="text/javascript" src="src/tools/tools.js?v=201703171710"></script> <script type="text/javascript" src="src/cluster.js?timestamp=4"></script> <script type="text/javascript" src="src/topic.js"></script> +<script type="text/javascript" src="src/proxy.js"></script> <script type="text/javascript" src="src/consumer.js?timestamp=6"></script> <script type="text/javascript" src="src/producer.js"></script> <script type="text/javascript" src="src/message.js"></script> diff --git a/src/main/resources/static/src/app.js b/src/main/resources/static/src/app.js index a7ca1be..1bbb650 100644 --- a/src/main/resources/static/src/app.js +++ b/src/main/resources/static/src/app.js @@ -213,6 +213,9 @@ app.config(['$routeProvider', '$httpProvider','$cookiesProvider','getDictNamePro }).when('/ops', { templateUrl: 'view/pages/ops.html', controller:'opsController' + }).when('/proxy', { + templateUrl: 'view/pages/proxy.html', + controller:'proxyController' }).when('/acl', { templateUrl: 'view/pages/acl.html', controller: 'aclController' diff --git a/src/main/resources/static/src/consumer.js b/src/main/resources/static/src/consumer.js index 8c0833e..d192334 100644 --- a/src/main/resources/static/src/consumer.js +++ b/src/main/resources/static/src/consumer.js @@ -79,6 +79,7 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific url: "consumer/groupList.query", params: { skipSysGroup: false, + address: localStorage.getItem('isV5') ? localStorage.getItem('proxyAddr') : null } }).success(function (resp) { if (resp.status == 0) { @@ -243,11 +244,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific } }); }; - $scope.detail = function (consumerGroupName) { + $scope.detail = function (consumerGroupName, address) { $http({ method: "GET", url: "consumer/queryTopicByConsumer.query", - params: {consumerGroup: consumerGroupName} + params: {consumerGroup: consumerGroupName, address: address} }).success(function (resp) { if (resp.status == 0) { console.log(resp); @@ -262,11 +263,11 @@ module.controller('consumerController', ['$scope', 'ngDialog', '$http', 'Notific }); }; - $scope.client = function (consumerGroupName) { + $scope.client = function (consumerGroupName, address) { $http({ method: "GET", url: "consumer/consumerConnection.query", - params: {consumerGroup: consumerGroupName} + params: {consumerGroup: consumerGroupName, address: address} }).success(function (resp) { if (resp.status == 0) { console.log(resp); diff --git a/src/main/resources/static/src/i18n/en.js b/src/main/resources/static/src/i18n/en.js index 6bc16cd..83083d7 100644 --- a/src/main/resources/static/src/i18n/en.js +++ b/src/main/resources/static/src/i18n/en.js @@ -100,6 +100,7 @@ var en = { "RESET_OFFSET":"resetOffset", "CLUSTER_NAME":"clusterName", "OPS":"OPS", + "PROXY":"Proxy", "AUTO_REFRESH":"AUTO_REFRESH", "REFRESH":"REFRESH", "LOGOUT":"Logout", diff --git a/src/main/resources/static/src/i18n/zh.js b/src/main/resources/static/src/i18n/zh.js index f71ae34..f8c3c1d 100644 --- a/src/main/resources/static/src/i18n/zh.js +++ b/src/main/resources/static/src/i18n/zh.js @@ -101,6 +101,7 @@ var zh = { "RESET_OFFSET":"重置位点", "CLUSTER_NAME":"集群名", "OPS":"运维", + "PROXY":"代理", "AUTO_REFRESH":"自动刷新", "REFRESH":"刷新", "LOGOUT":"退出", diff --git a/src/main/resources/static/src/proxy.js b/src/main/resources/static/src/proxy.js new file mode 100644 index 0000000..4461b09 --- /dev/null +++ b/src/main/resources/static/src/proxy.js @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +var module = app; +module.controller('proxyController', ['$scope', '$location', '$http', 'Notification', 'remoteApi', 'tools', '$window', + function ($scope, $location, $http, Notification, remoteApi, tools, $window) { + $scope.proxyAddrList = []; + $scope.userRole = $window.sessionStorage.getItem("userrole"); + $scope.writeOperationEnabled = $scope.userRole == null ? true : ($scope.userRole == 1 ? true : false); + $scope.inputReadonly = !$scope.writeOperationEnabled; + $scope.newProxyAddr = ""; + $scope.allProxyConfig = {}; + + $http({ + method: "GET", + url: "proxy/homePage.query" + }).success(function (resp) { + if (resp.status == 0) { + $scope.proxyAddrList = resp.data.proxyAddrList; + $scope.selectedProxy = resp.data.currentProxyAddr; + $scope.showProxyDetailConfig($scope.selectedProxy); + localStorage.setItem('proxyAddr',$scope.selectedProxy); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + + $scope.eleChange = function (data) { + $scope.proxyAddrList = data; + } + $scope.showDetailConf = function () { + $(".proxyModal").modal(); + } + + + $scope.showProxyDetailConfig = function (proxyAddr) { + $http({ + method: "GET", + url: "proxy/proxyDetailConfig.query", + params: {proxyAddress: proxyAddr} + }).success(function (resp) { + if (resp.status == 0) { + $scope.allProxyConfig = resp.data; + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + }; + + $scope.updateProxyAddr = function () { + $http({ + method: "POST", + url: "proxy/updateProxyAddr.do", + params: {proxyAddr: $scope.selectedProxy} + }).success(function (resp) { + if (resp.status == 0) { + localStorage.setItem('proxyAddr', $scope.selectedProxy); + Notification.info({message: "SUCCESS", delay: 2000}); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + $scope.showProxyDetailConfig($scope.selectedProxy); + }; + + $scope.addProxyAddr = function () { + $http({ + method: "POST", + url: "proxy/addProxyAddr.do", + params: {newProxyAddr: $scope.newProxyAddr} + }).success(function (resp) { + if (resp.status == 0) { + if ($scope.proxyAddrList.indexOf($scope.newProxyAddr) == -1) { + $scope.proxyAddrList.push($scope.newProxyAddr); + } + $("#proxyAddr").val(""); + $scope.newProxyAddr = ""; + Notification.info({message: "SUCCESS", delay: 2000}); + } else { + Notification.error({message: resp.errMsg, delay: 2000}); + } + }); + }; + }]) diff --git a/src/main/resources/static/view/layout/_header.html b/src/main/resources/static/view/layout/_header.html index a78b9f2..8159138 100644 --- a/src/main/resources/static/view/layout/_header.html +++ b/src/main/resources/static/view/layout/_header.html @@ -28,6 +28,7 @@ <div class="navbar-collapse collapse navbar-warning-collapse"> <ul class="nav navbar-nav"> <li ng-class="path =='ops' ? 'active':''"><a ng-href="#/ops">{{'OPS' | translate}}</a></li> + <li ng-show="rmqVersion" ng-class="path =='proxy' ? 'active':''"><a ng-href="#/proxy">{{'PROXY' | translate}}</a></li> <li ng-class="path =='dashboard' || path ==''? 'active':''"><a ng-href="#/">{{'DASHBOARD' | translate}}</a></li> <li ng-class="path =='cluster' ? 'active':''"><a ng-href="#/cluster">{{'CLUSTER' | translate}}</a></li> <li ng-class="path =='topic' ? 'active':''"><a ng-href="#/topic">{{'TOPIC' | translate}}</a></li> diff --git a/src/main/resources/static/view/pages/consumer.html b/src/main/resources/static/view/pages/consumer.html index 47fddad..d883afc 100644 --- a/src/main/resources/static/view/pages/consumer.html +++ b/src/main/resources/static/view/pages/consumer.html @@ -66,11 +66,11 @@ <td class="text-center">{{consumerGroup.consumeTps}}</td> <td class="text-center">{{consumerGroup.diffTotal}}</td> <td class="text-left"> - <button name="client" ng-click="client(consumerGroup.group)" + <button name="client" ng-click="client(consumerGroup.group, consumerGroup.address)" class="btn btn-raised btn-sm btn-primary" type="button">{{'CLIENT' | translate}} </button> - <button name="client" ng-click="detail(consumerGroup.group)" + <button name="client" ng-click="detail(consumerGroup.group, consumerGroup.address)" class="btn btn-raised btn-sm btn-primary" type="button">{{'CONSUME_DETAIL' | translate}} </button> diff --git a/src/main/resources/static/view/pages/proxy.html b/src/main/resources/static/view/pages/proxy.html new file mode 100644 index 0000000..43f34ce --- /dev/null +++ b/src/main/resources/static/view/pages/proxy.html @@ -0,0 +1,67 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<div class="container-fluid" id="deployHistoryList"> + <div class="page-content"> + <h2 class="md-title">ProxyServerAddressList</h2> + <div class="pull-left" style="min-width: 400px; max-width: 500px; padding: 10px 10px 10px 0"> + <select ng-model="selectedProxy" chosen + ng-options="x for x in proxyAddrList" + ng-change="updateProxyAddr()" + required></select> + </div> + <div class="pull-left"> + <button class="btn btn-raised btn-sm btn-primary" type="button" ng-show="{{writeOperationEnabled}}" + ng-click="updateProxyAddr()">{{'UPDATE' | translate}} + </button> + </div> + <form class="form-inline pull-left" style="margin-left: 20px" ng-show="{{writeOperationEnabled}}"> + <div class="form-group" style="margin: 0"> + <label for="proxyAddr">ProxyAddr:</label> + <input id="proxyAddr" class="form-control" style="width: 300px; margin: 0 10px 0 10px" type="text" ng-model="newProxyAddr" required/> + <button class="btn btn-raised btn-sm btn-primary" type="button" + ng-click="addProxyAddr()"> {{ 'ADD' | translate}} + </button> + </div> + </form> + </div> +</div> + +<div class="modal proxyModal fade" role="dialog" tabindex="-1" aria-hidden="true" aria-labelledby="config-modal-label"> + <div class="modal-dialog modal-lg"> + <div class="modal-content" > + <div class="modal-header"> + <button class="close" type="button" data-dismiss="modal">×</button> + <h4 id="config-modal-label" class="modal-title"> + [{{selectedProxy}}] + </h4> + </div> + <div class="modal-body limit_height"> + <table class="table table-bordered"> + <tr ng-repeat="(key, value) in allProxyConfig"> + <td>{{key}}</td> + <td>{{value}}</td> + </tr> + </table> + </div> + <div class="modal-footer"> + <div class="col-md-12 text-center"> + <button type="button" class="btn btn-raised" data-dismiss="modal">{{ 'CLOSE' | translate }}</button> + </div> + </div> + </div> + </div> +</div>