This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e59918704 add GET_CONSUMER_CONNECTION_LIST
e59918704 is described below
commit e5991870479bb7302c5908c04aa2d858c5e1f2fb
Author: lyx <[email protected]>
AuthorDate: Thu Mar 9 10:57:54 2023 +0800
add GET_CONSUMER_CONNECTION_LIST
---
.../proxy/remoting/RemotingProtocolServer.java | 1 +
.../remoting/activity/ConsumerManagerActivity.java | 48 ++++++++++++++++++++++
2 files changed, 49 insertions(+)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index b5c749d3b..85c960562 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -204,6 +204,7 @@ public class RemotingProtocolServer implements
StartAndShutdown, RemotingProxyOu
remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET,
consumerManagerActivity, this.updateOffsetExecutor);
remotingServer.registerProcessor(RequestCode.ACK_MESSAGE,
consumerManagerActivity, this.updateOffsetExecutor);
remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME,
consumerManagerActivity, this.updateOffsetExecutor);
+
remotingServer.registerProcessor(RequestCode.GET_CONSUMER_CONNECTION_LIST,
consumerManagerActivity, this.updateOffsetExecutor);
remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
consumerManagerActivity, this.defaultExecutor);
remotingServer.registerProcessor(RequestCode.GET_MAX_OFFSET,
consumerManagerActivity, this.defaultExecutor);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
index 1c1993ff0..e9d42afc2 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
@@ -17,17 +17,25 @@
package org.apache.rocketmq.proxy.remoting.activity;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.Connection;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
+import
org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody;
import
org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader;
@@ -62,6 +70,9 @@ public class ConsumerManagerActivity extends
AbstractRemotingActivity {
case RequestCode.GET_EARLIEST_MSG_STORETIME: {
return request(ctx, request, context,
Duration.ofSeconds(3).toMillis());
}
+ case RequestCode.GET_CONSUMER_CONNECTION_LIST: {
+ return getConsumerConnectionList(ctx, request, context);
+ }
default:
break;
}
@@ -81,6 +92,43 @@ public class ConsumerManagerActivity extends
AbstractRemotingActivity {
return response;
}
+ protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext
ctx, RemotingCommand request,
+ ProxyContext context) throws Exception {
+ RemotingCommand response =
RemotingCommand.createResponseCommand(GetConsumerConnectionListRequestHeader.class);
+ GetConsumerConnectionListRequestHeader header =
(GetConsumerConnectionListRequestHeader)
request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
+ ConsumerGroupInfo consumerGroupInfo =
messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup());
+ if (consumerGroupInfo != null) {
+ ConsumerConnection bodydata = new ConsumerConnection();
+
bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere());
+ bodydata.setConsumeType(consumerGroupInfo.getConsumeType());
+ bodydata.setMessageModel(consumerGroupInfo.getMessageModel());
+
bodydata.getSubscriptionTable().putAll(consumerGroupInfo.getSubscriptionTable());
+
+ Iterator<Map.Entry<Channel, ClientChannelInfo>> it =
consumerGroupInfo.getChannelInfoTable().entrySet().iterator();
+ while (it.hasNext()) {
+ ClientChannelInfo info = it.next().getValue();
+ Connection connection = new Connection();
+ connection.setClientId(info.getClientId());
+ connection.setLanguage(info.getLanguage());
+ connection.setVersion(info.getVersion());
+
connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel()));
+
+ bodydata.getConnectionSet().add(connection);
+ }
+
+ byte[] body = bodydata.encode();
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ return response;
+ }
+
+ response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
+ response.setRemark("the consumer group[" + header.getConsumerGroup() +
"] not online");
+ return response;
+ }
+
protected RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
RemotingCommand request,
ProxyContext context) throws Exception {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);