This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new bbbe737e4e [ISSUE #6964] use the correct context in telemetry; polish 
the code structure (#6965)
bbbe737e4e is described below

commit bbbe737e4e57ebc32581220fa8766cf32f7833eb
Author: lk <[email protected]>
AuthorDate: Thu Jun 29 15:27:30 2023 +0800

    [ISSUE #6964] use the correct context in telemetry; polish the code 
structure (#6965)
---
 .../proxy/grpc/v2/ContextStreamObserver.java       | 29 +++++++++++
 .../proxy/grpc/v2/DefaultGrpcMessingActivity.java  |  5 +-
 .../proxy/grpc/v2/GrpcMessagingApplication.java    |  6 +--
 .../proxy/grpc/v2/GrpcMessingActivity.java         |  2 +-
 .../proxy/grpc/v2/client/ClientActivity.java       | 18 ++++---
 .../grpc/v2/common/GrpcClientSettingsManager.java  | 22 ++++----
 .../rocketmq/proxy/processor/ClientProcessor.java  |  2 +-
 .../proxy/processor/DefaultMessagingProcessor.java |  4 +-
 .../proxy/processor/MessagingProcessor.java        |  2 +-
 .../remoting/activity/ClientManagerActivity.java   | 12 ++---
 .../remoting/activity/ConsumerManagerActivity.java |  4 +-
 .../remoting/activity/PullMessageActivity.java     |  2 +-
 .../remoting/channel/RemotingChannelManager.java   |  9 ++--
 .../proxy/service/route/TopicRouteService.java     | 60 ++++------------------
 .../proxy/grpc/v2/client/ClientActivityTest.java   | 16 +++---
 .../v2/common/GrpcClientSettingsManagerTest.java   |  8 +--
 .../remoting/activity/PullMessageActivityTest.java |  4 +-
 .../channel/RemotingChannelManagerTest.java        | 30 ++++++-----
 .../protocol/body/LockBatchRequestBody.java        | 11 ++++
 .../protocol/body/UnlockBatchRequestBody.java      | 11 ++++
 .../protocol/header/NotificationRequestHeader.java | 14 +++++
 .../header/QueryConsumerOffsetRequestHeader.java   | 11 ++++
 22 files changed, 160 insertions(+), 122 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/ContextStreamObserver.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/ContextStreamObserver.java
new file mode 100644
index 0000000000..c186bfb61c
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/ContextStreamObserver.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.grpc.v2;
+
+import org.apache.rocketmq.proxy.common.ProxyContext;
+
+public interface ContextStreamObserver<V> {
+
+    void onNext(ProxyContext ctx, V value);
+
+    void onError(Throwable t);
+
+    void onCompleted();
+}
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
index 9d49e0e2ca..73b764bc4f 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/DefaultGrpcMessingActivity.java
@@ -150,8 +150,7 @@ public class DefaultGrpcMessingActivity extends 
AbstractStartAndShutdown impleme
     }
 
     @Override
-    public StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx,
-        StreamObserver<TelemetryCommand> responseObserver) {
-        return this.clientActivity.telemetry(ctx, responseObserver);
+    public ContextStreamObserver<TelemetryCommand> 
telemetry(StreamObserver<TelemetryCommand> responseObserver) {
+        return this.clientActivity.telemetry(responseObserver);
     }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
index 32395322a3..2cb395ad60 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
@@ -378,17 +378,17 @@ public class GrpcMessagingApplication extends 
MessagingServiceGrpc.MessagingServ
     @Override
     public StreamObserver<TelemetryCommand> 
telemetry(StreamObserver<TelemetryCommand> responseObserver) {
         Function<Status, TelemetryCommand> statusResponseCreator = status -> 
TelemetryCommand.newBuilder().setStatus(status).build();
-        ProxyContext context = createContext();
-        StreamObserver<TelemetryCommand> responseTelemetryCommand = 
grpcMessingActivity.telemetry(context, responseObserver);
+        ContextStreamObserver<TelemetryCommand> responseTelemetryCommand = 
grpcMessingActivity.telemetry(responseObserver);
         return new StreamObserver<TelemetryCommand>() {
             @Override
             public void onNext(TelemetryCommand value) {
+                ProxyContext context = createContext();
                 try {
                     validateContext(context);
                     addExecutor(clientManagerThreadPoolExecutor,
                         context,
                         value,
-                        () -> responseTelemetryCommand.onNext(value),
+                        () -> responseTelemetryCommand.onNext(context, value),
                         responseObserver,
                         statusResponseCreator);
                 } catch (Throwable t) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java
index 8f1db82307..77bd3a88f9 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessingActivity.java
@@ -69,5 +69,5 @@ public interface GrpcMessingActivity extends StartAndShutdown 
{
     CompletableFuture<ChangeInvisibleDurationResponse> 
changeInvisibleDuration(ProxyContext ctx,
         ChangeInvisibleDurationRequest request);
 
-    StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx, 
StreamObserver<TelemetryCommand> responseObserver);
+    ContextStreamObserver<TelemetryCommand> 
telemetry(StreamObserver<TelemetryCommand> responseObserver);
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index a60228eb9f..8553289498 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -52,6 +52,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
 import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
+import org.apache.rocketmq.proxy.grpc.v2.ContextStreamObserver;
 import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
 import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
 import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
@@ -174,11 +175,10 @@ public class ClientActivity extends 
AbstractMessingActivity {
         return future;
     }
 
-    public StreamObserver<TelemetryCommand> telemetry(ProxyContext ctx,
-        StreamObserver<TelemetryCommand> responseObserver) {
-        return new StreamObserver<TelemetryCommand>() {
+    public ContextStreamObserver<TelemetryCommand> 
telemetry(StreamObserver<TelemetryCommand> responseObserver) {
+        return new ContextStreamObserver<TelemetryCommand>() {
             @Override
-            public void onNext(TelemetryCommand request) {
+            public void onNext(ProxyContext ctx, TelemetryCommand request) {
                 try {
                     switch (request.getCommandCase()) {
                         case SETTINGS: {
@@ -271,7 +271,7 @@ public class ClientActivity extends AbstractMessingActivity 
{
 
     protected TelemetryCommand processClientSettings(ProxyContext ctx, 
TelemetryCommand request) {
         String clientId = ctx.getClientID();
-        grpcClientSettingsManager.updateClientSettings(clientId, 
request.getSettings());
+        grpcClientSettingsManager.updateClientSettings(ctx, clientId, 
request.getSettings());
         Settings settings = grpcClientSettingsManager.getClientSettings(ctx);
         return TelemetryCommand.newBuilder()
             .setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, 
Code.OK.name()))
@@ -458,7 +458,11 @@ public class ClientActivity extends 
AbstractMessingActivity {
                     if (settings == null) {
                         return;
                     }
-                    
grpcClientSettingsManager.updateClientSettings(clientChannelInfo.getClientId(), 
settings);
+                    grpcClientSettingsManager.updateClientSettings(
+                        ProxyContext.createForInner(this.getClass()),
+                        clientChannelInfo.getClientId(),
+                        settings
+                    );
                 }
             }
         }
@@ -475,7 +479,7 @@ public class ClientActivity extends AbstractMessingActivity 
{
         public void handle(ProducerGroupEvent event, String group, 
ClientChannelInfo clientChannelInfo) {
             if (event == ProducerGroupEvent.CLIENT_UNREGISTER) {
                 
grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
-                
grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId());
+                
grpcClientSettingsManager.removeAndGetRawClientSettings(clientChannelInfo.getClientId());
             }
         }
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
index af8b4546e1..1eff659392 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManager.java
@@ -33,15 +33,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.MetricCollectorMode;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
@@ -68,7 +67,7 @@ public class GrpcClientSettingsManager extends ServiceThread 
implements StartAnd
 
     public Settings getClientSettings(ProxyContext ctx) {
         String clientId = ctx.getClientID();
-        Settings settings = CLIENT_SETTINGS_MAP.get(clientId);
+        Settings settings = getRawClientSettings(clientId);
         if (settings == null) {
             return null;
         }
@@ -182,7 +181,7 @@ public class GrpcClientSettingsManager extends 
ServiceThread implements StartAnd
             .build();
     }
 
-    public void updateClientSettings(String clientId, Settings settings) {
+    public void updateClientSettings(ProxyContext ctx, String clientId, 
Settings settings) {
         if (settings.hasSubscription()) {
             settings = 
createDefaultConsumerSettingsBuilder().mergeFrom(settings).build();
         }
@@ -194,17 +193,13 @@ public class GrpcClientSettingsManager extends 
ServiceThread implements StartAnd
             .toBuilder();
     }
 
-    public void removeClientSettings(String clientId) {
-        CLIENT_SETTINGS_MAP.remove(clientId);
-    }
-
-    public void computeIfPresent(String clientId, Function<Settings, Settings> 
function) {
-        CLIENT_SETTINGS_MAP.computeIfPresent(clientId, (clientIdKey, value) -> 
function.apply(value));
+    public Settings removeAndGetRawClientSettings(String clientId) {
+        return CLIENT_SETTINGS_MAP.remove(clientId);
     }
 
     public Settings removeAndGetClientSettings(ProxyContext ctx) {
         String clientId = ctx.getClientID();
-        Settings settings = CLIENT_SETTINGS_MAP.remove(clientId);
+        Settings settings = this.removeAndGetRawClientSettings(clientId);
         if (settings == null) {
             return null;
         }
@@ -237,7 +232,10 @@ public class GrpcClientSettingsManager extends 
ServiceThread implements StartAnd
                         return settings;
                     }
                     String consumerGroup = 
GrpcConverter.getInstance().wrapResourceWithNamespace(settings.getSubscription().getGroup());
-                    ConsumerGroupInfo consumerGroupInfo = 
this.messagingProcessor.getConsumerGroupInfo(consumerGroup);
+                    ConsumerGroupInfo consumerGroupInfo = 
this.messagingProcessor.getConsumerGroupInfo(
+                        ProxyContext.createForInner(this.getClass()),
+                        consumerGroup
+                    );
                     if (consumerGroupInfo == null || 
consumerGroupInfo.findChannel(clientId) == null) {
                         log.info("remove unused grpc client settings. 
group:{}, settings:{}", consumerGroupInfo, settings);
                         return null;
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
index 8fb6eaf7df..eeb9bf87e6 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
@@ -110,7 +110,7 @@ public class ClientProcessor extends AbstractProcessor {
         
this.serviceManager.getConsumerManager().appendConsumerIdsChangeListener(listener);
     }
 
-    public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup) {
+    public ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String 
consumerGroup) {
         return 
this.serviceManager.getConsumerManager().getConsumerGroupInfo(consumerGroup);
     }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 72ff9b939d..e663ae1ba2 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -290,8 +290,8 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
     }
 
     @Override
-    public ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup) {
-        return this.clientProcessor.getConsumerGroupInfo(consumerGroup);
+    public ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String 
consumerGroup) {
+        return this.clientProcessor.getConsumerGroupInfo(ctx, consumerGroup);
     }
 
     @Override
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index 40ffb96a7a..263068965a 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -288,7 +288,7 @@ public interface MessagingProcessor extends 
StartAndShutdown {
 
     void doChannelCloseEvent(String remoteAddr, Channel channel);
 
-    ConsumerGroupInfo getConsumerGroupInfo(String consumerGroup);
+    ConsumerGroupInfo getConsumerGroupInfo(ProxyContext ctx, String 
consumerGroup);
 
     void addTransactionSubscription(
         ProxyContext ctx,
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
index 69280fb864..1eb81ce927 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
@@ -80,7 +80,7 @@ public class ClientManagerActivity extends 
AbstractRemotingActivity {
 
         for (ProducerData data : heartbeatData.getProducerDataSet()) {
             ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
-                
this.remotingChannelManager.createProducerChannel(ctx.channel(), 
data.getGroupName(), clientId),
+                this.remotingChannelManager.createProducerChannel(context, 
ctx.channel(), data.getGroupName(), clientId),
                 clientId, request.getLanguage(),
                 request.getVersion());
             setClientPropertiesToChannelAttr(clientChannelInfo);
@@ -89,7 +89,7 @@ public class ClientManagerActivity extends 
AbstractRemotingActivity {
 
         for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
             ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
-                
this.remotingChannelManager.createConsumerChannel(ctx.channel(), 
data.getGroupName(), clientId, data.getSubscriptionDataSet()),
+                this.remotingChannelManager.createConsumerChannel(context, 
ctx.channel(), data.getGroupName(), clientId, data.getSubscriptionDataSet()),
                 clientId, request.getLanguage(),
                 request.getVersion());
             setClientPropertiesToChannelAttr(clientChannelInfo);
@@ -122,7 +122,7 @@ public class ClientManagerActivity extends 
AbstractRemotingActivity {
             (UnregisterClientRequestHeader) 
request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
         final String producerGroup = requestHeader.getProducerGroup();
         if (producerGroup != null) {
-            RemotingChannel channel = 
this.remotingChannelManager.removeProducerChannel(producerGroup, ctx.channel());
+            RemotingChannel channel = 
this.remotingChannelManager.removeProducerChannel(context, producerGroup, 
ctx.channel());
             ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
                 channel,
                 requestHeader.getClientID(),
@@ -132,7 +132,7 @@ public class ClientManagerActivity extends 
AbstractRemotingActivity {
         }
         final String consumerGroup = requestHeader.getConsumerGroup();
         if (consumerGroup != null) {
-            RemotingChannel channel = 
this.remotingChannelManager.removeConsumerChannel(consumerGroup, ctx.channel());
+            RemotingChannel channel = 
this.remotingChannelManager.removeConsumerChannel(context, consumerGroup, 
ctx.channel());
             ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
                 channel,
                 requestHeader.getClientID(),
@@ -170,7 +170,7 @@ public class ClientManagerActivity extends 
AbstractRemotingActivity {
                 }
                 if (args[0] instanceof ClientChannelInfo) {
                     ClientChannelInfo clientChannelInfo = (ClientChannelInfo) 
args[0];
-                    remotingChannelManager.removeConsumerChannel(group, 
clientChannelInfo.getChannel());
+                    
remotingChannelManager.removeConsumerChannel(ProxyContext.createForInner(this.getClass()),
 group, clientChannelInfo.getChannel());
                     log.info("remove remoting channel when client unregister. 
clientChannelInfo:{}", clientChannelInfo);
                 }
             }
@@ -187,7 +187,7 @@ public class ClientManagerActivity extends 
AbstractRemotingActivity {
         @Override
         public void handle(ProducerGroupEvent event, String group, 
ClientChannelInfo clientChannelInfo) {
             if (event == ProducerGroupEvent.CLIENT_UNREGISTER) {
-                remotingChannelManager.removeProducerChannel(group, 
clientChannelInfo.getChannel());
+                
remotingChannelManager.removeProducerChannel(ProxyContext.createForInner(this.getClass()),
 group, clientChannelInfo.getChannel());
             }
         }
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
index e9d42afc2c..b21b4afa42 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
@@ -83,7 +83,7 @@ public class ConsumerManagerActivity extends 
AbstractRemotingActivity {
         ProxyContext context) throws Exception {
         RemotingCommand response = 
RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
         GetConsumerListByGroupRequestHeader header = 
(GetConsumerListByGroupRequestHeader) 
request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
-        ConsumerGroupInfo consumerGroupInfo = 
messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup());
+        ConsumerGroupInfo consumerGroupInfo = 
messagingProcessor.getConsumerGroupInfo(context, header.getConsumerGroup());
         List<String> clientIds = consumerGroupInfo.getAllClientId();
         GetConsumerListByGroupResponseBody body = new 
GetConsumerListByGroupResponseBody();
         body.setConsumerIdList(clientIds);
@@ -96,7 +96,7 @@ public class ConsumerManagerActivity extends 
AbstractRemotingActivity {
         ProxyContext context) throws Exception {
         RemotingCommand response = 
RemotingCommand.createResponseCommand(GetConsumerConnectionListRequestHeader.class);
         GetConsumerConnectionListRequestHeader header = 
(GetConsumerConnectionListRequestHeader) 
request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
-        ConsumerGroupInfo consumerGroupInfo = 
messagingProcessor.getConsumerGroupInfo(header.getConsumerGroup());
+        ConsumerGroupInfo consumerGroupInfo = 
messagingProcessor.getConsumerGroupInfo(context, header.getConsumerGroup());
         if (consumerGroupInfo != null) {
             ConsumerConnection bodydata = new ConsumerConnection();
             
bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere());
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
index d548ddc0df..3324c231ab 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
@@ -41,7 +41,7 @@ public class PullMessageActivity extends 
AbstractRemotingActivity {
         PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) 
request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
         int sysFlag = requestHeader.getSysFlag();
         if (!PullSysFlag.hasSubscriptionFlag(sysFlag)) {
-            ConsumerGroupInfo consumerInfo = 
messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup());
+            ConsumerGroupInfo consumerInfo = 
messagingProcessor.getConsumerGroupInfo(context, 
requestHeader.getConsumerGroup());
             if (consumerInfo == null) {
                 return 
RemotingCommand.buildErrorResponse(ResponseCode.SUBSCRIPTION_NOT_LATEST,
                     "the consumer's subscription not latest");
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
index 133865f48b..211c3c9275 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -57,11 +58,11 @@ public class RemotingChannelManager implements 
StartAndShutdown {
         return prefix + group;
     }
 
-    public RemotingChannel createProducerChannel(Channel channel, String 
group, String clientId) {
+    public RemotingChannel createProducerChannel(ProxyContext ctx, Channel 
channel, String group, String clientId) {
         return createChannel(channel, buildProducerKey(group), clientId, 
Collections.emptySet());
     }
 
-    public RemotingChannel createConsumerChannel(Channel channel, String 
group, String clientId, Set<SubscriptionData> subscriptionData) {
+    public RemotingChannel createConsumerChannel(ProxyContext ctx, Channel 
channel, String group, String clientId, Set<SubscriptionData> subscriptionData) 
{
         return createChannel(channel, buildConsumerKey(group), clientId, 
subscriptionData);
     }
 
@@ -96,11 +97,11 @@ public class RemotingChannelManager implements 
StartAndShutdown {
         return removedChannelSet;
     }
 
-    public RemotingChannel removeProducerChannel(String group, Channel 
channel) {
+    public RemotingChannel removeProducerChannel(ProxyContext ctx, String 
group, Channel channel) {
         return removeChannel(buildProducerKey(group), channel);
     }
 
-    public RemotingChannel removeConsumerChannel(String group, Channel 
channel) {
+    public RemotingChannel removeConsumerChannel(ProxyContext ctx, String 
group, Channel channel) {
         return removeChannel(buildConsumerKey(group), channel);
     }
 
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index 3fa6414c39..b6b14faa49 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -26,19 +26,18 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.common.AbstractCacheLoader;
-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
 import org.apache.rocketmq.proxy.common.Address;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 import org.checkerframework.checker.nullness.qual.NonNull;
@@ -52,8 +51,6 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
     protected final LoadingCache<String /* topicName */, MessageQueueView> 
topicCache;
     protected final ScheduledExecutorService scheduledExecutorService;
     protected final ThreadPoolExecutor cacheRefreshExecutor;
-    private final TopicRouteCacheLoader topicRouteCacheLoader = new 
TopicRouteCacheLoader();
-
 
     public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
@@ -76,13 +73,8 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
             executor(cacheRefreshExecutor).build(new CacheLoader<String, 
MessageQueueView>() {
                 @Override public @Nullable MessageQueueView load(String topic) 
throws Exception {
                     try {
-                        TopicRouteData topicRouteData = 
topicRouteCacheLoader.loadTopicRouteData(topic);
-                        if (isTopicRouteValid(topicRouteData)) {
-                            MessageQueueView tmp = new MessageQueueView(topic, 
topicRouteData);
-                            log.info("load topic route from namesrv. topic: 
{}, queue: {}", topic, tmp);
-                            return tmp;
-                        }
-                        return MessageQueueView.WRAPPED_EMPTY_QUEUE;
+                        TopicRouteData topicRouteData = 
mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic, 
Duration.ofSeconds(3).toMillis());
+                        return buildMessageQueueView(topic, topicRouteData);
                     } catch (Exception e) {
                         if (TopicRouteHelper.isTopicNotExistError(e)) {
                             return MessageQueueView.WRAPPED_EMPTY_QUEUE;
@@ -138,44 +130,12 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
             && routeData.getBrokerDatas() != null && 
!routeData.getBrokerDatas().isEmpty();
     }
 
-    protected abstract class AbstractTopicRouteCacheLoader extends 
AbstractCacheLoader<String, MessageQueueView> {
-
-        public AbstractTopicRouteCacheLoader() {
-            super(cacheRefreshExecutor);
-        }
-
-        protected abstract TopicRouteData loadTopicRouteData(String topic) 
throws Exception;
-
-        @Override
-        public MessageQueueView getDirectly(String topic) throws Exception {
-            try {
-                TopicRouteData topicRouteData = loadTopicRouteData(topic);
-
-                if (isTopicRouteValid(topicRouteData)) {
-                    MessageQueueView tmp = new MessageQueueView(topic, 
topicRouteData);
-                    log.info("load topic route from namesrv. topic: {}, queue: 
{}", topic, tmp);
-                    return tmp;
-                }
-                return MessageQueueView.WRAPPED_EMPTY_QUEUE;
-            } catch (Exception e) {
-                if (TopicRouteHelper.isTopicNotExistError(e)) {
-                    return MessageQueueView.WRAPPED_EMPTY_QUEUE;
-                }
-                throw e;
-            }
-        }
-
-        @Override
-        protected void onErr(String key, Exception e) {
-            log.error("load topic route from namesrv failed. topic:{}", key, 
e);
-        }
-    }
-
-    protected class TopicRouteCacheLoader extends 
AbstractTopicRouteCacheLoader {
-
-        @Override
-        protected TopicRouteData loadTopicRouteData(String topic) throws 
Exception {
-            return 
mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic, 
Duration.ofSeconds(3).toMillis());
+    protected MessageQueueView buildMessageQueueView(String topic, 
TopicRouteData topicRouteData) {
+        if (isTopicRouteValid(topicRouteData)) {
+            MessageQueueView tmp = new MessageQueueView(topic, topicRouteData);
+            log.info("load topic route from namesrv. topic: {}, queue: {}", 
topic, tmp);
+            return tmp;
         }
+        return MessageQueueView.WRAPPED_EMPTY_QUEUE;
     }
 }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
index a5d4e3c919..0c1ebcdfae 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
@@ -43,6 +43,7 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
+import org.apache.rocketmq.proxy.grpc.v2.ContextStreamObserver;
 import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
 import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
 import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
@@ -341,7 +342,7 @@ public class ClientActivityTest extends BaseActivityTest {
         String nonce = "123";
         
when(grpcChannelManagerMock.getAndRemoveResponseFuture(anyString())).thenReturn((CompletableFuture)
 runningInfoFutureMock);
         ProxyContext context = createContext();
-        StreamObserver<TelemetryCommand> streamObserver = 
clientActivity.telemetry(context, new StreamObserver<TelemetryCommand>() {
+        ContextStreamObserver<TelemetryCommand> streamObserver = 
clientActivity.telemetry(new StreamObserver<TelemetryCommand>() {
             @Override
             public void onNext(TelemetryCommand value) {
             }
@@ -354,7 +355,7 @@ public class ClientActivityTest extends BaseActivityTest {
             public void onCompleted() {
             }
         });
-        streamObserver.onNext(TelemetryCommand.newBuilder()
+        streamObserver.onNext(context, TelemetryCommand.newBuilder()
             .setThreadStackTrace(ThreadStackTrace.newBuilder()
                 .setThreadStackTrace(jstack)
                 .setNonce(nonce)
@@ -373,7 +374,7 @@ public class ClientActivityTest extends BaseActivityTest {
         String nonce = "123";
         
when(grpcChannelManagerMock.getAndRemoveResponseFuture(anyString())).thenReturn((CompletableFuture)
 resultFutureMock);
         ProxyContext context = createContext();
-        StreamObserver<TelemetryCommand> streamObserver = 
clientActivity.telemetry(context, new StreamObserver<TelemetryCommand>() {
+        ContextStreamObserver<TelemetryCommand> streamObserver = 
clientActivity.telemetry(new StreamObserver<TelemetryCommand>() {
             @Override
             public void onNext(TelemetryCommand value) {
             }
@@ -386,7 +387,7 @@ public class ClientActivityTest extends BaseActivityTest {
             public void onCompleted() {
             }
         });
-        streamObserver.onNext(TelemetryCommand.newBuilder()
+        streamObserver.onNext(context, TelemetryCommand.newBuilder()
             .setVerifyMessageResult(VerifyMessageResult.newBuilder()
                 .setNonce(nonce)
                 .build())
@@ -418,11 +419,8 @@ public class ClientActivityTest extends BaseActivityTest {
 
             }
         };
-        StreamObserver<TelemetryCommand> requestObserver = 
this.clientActivity.telemetry(
-            ctx,
-            responseObserver
-        );
-        requestObserver.onNext(TelemetryCommand.newBuilder()
+        ContextStreamObserver<TelemetryCommand> requestObserver = 
this.clientActivity.telemetry(responseObserver);
+        requestObserver.onNext(ctx, TelemetryCommand.newBuilder()
             .setSettings(settings)
             .build());
         return future;
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
index 9044873a6d..6742f094c8 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
@@ -54,7 +54,7 @@ public class GrpcClientSettingsManagerTest extends 
BaseActivityTest {
     public void testGetProducerData() {
         ProxyContext context = 
ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID);
 
-        this.grpcClientSettingsManager.updateClientSettings(CLIENT_ID, 
Settings.newBuilder()
+        this.grpcClientSettingsManager.updateClientSettings(context, 
CLIENT_ID, Settings.newBuilder()
             .setBackoffPolicy(RetryPolicy.getDefaultInstance())
             .setPublishing(Publishing.getDefaultInstance())
             .build());
@@ -65,18 +65,18 @@ public class GrpcClientSettingsManagerTest extends 
BaseActivityTest {
 
     @Test
     public void testGetSubscriptionData() {
+        ProxyContext context = 
ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID);
+
         SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
         when(this.messagingProcessor.getSubscriptionGroupConfig(any(), any()))
             .thenReturn(subscriptionGroupConfig);
 
-        this.grpcClientSettingsManager.updateClientSettings(CLIENT_ID, 
Settings.newBuilder()
+        this.grpcClientSettingsManager.updateClientSettings(context, 
CLIENT_ID, Settings.newBuilder()
             .setSubscription(Subscription.newBuilder()
                 .setGroup(Resource.newBuilder().setName("group").build())
                 .build())
             .build());
 
-        ProxyContext context = 
ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID);
-
         Settings settings = 
this.grpcClientSettingsManager.getClientSettings(context);
         assertEquals(settings.getBackoffPolicy(), 
this.grpcClientSettingsManager.createDefaultConsumerSettingsBuilder().build().getBackoffPolicy());
 
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
index d8ad451875..a2f1f4cc89 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
@@ -77,7 +77,7 @@ public class PullMessageActivityTest extends InitConfigTest {
 
     @Test
     public void testPullMessageWithoutSub() throws Exception {
-        when(messagingProcessorMock.getConsumerGroupInfo(eq(group)))
+        when(messagingProcessorMock.getConsumerGroupInfo(any(), eq(group)))
             .thenReturn(consumerGroupInfoMock);
         SubscriptionData subscriptionData = new SubscriptionData();
         subscriptionData.setSubString(subString);
@@ -128,7 +128,7 @@ public class PullMessageActivityTest extends InitConfigTest 
{
 
     @Test
     public void testPullMessageWithSub() throws Exception {
-        when(messagingProcessorMock.getConsumerGroupInfo(eq(group)))
+        when(messagingProcessorMock.getConsumerGroupInfo(any(), eq(group)))
             .thenReturn(consumerGroupInfoMock);
         SubscriptionData subscriptionData = new SubscriptionData();
         subscriptionData.setSubString(subString);
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
index 5a5b441e95..1122405937 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManagerTest.java
@@ -21,6 +21,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelId;
 import java.util.HashSet;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
 import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
@@ -46,6 +47,7 @@ public class RemotingChannelManagerTest {
     private final String remoteAddress = "10.152.39.53:9768";
     private final String localAddress = "11.193.0.1:1210";
     private RemotingChannelManager remotingChannelManager;
+    private final ProxyContext ctx = 
ProxyContext.createForInner(this.getClass());
 
     @Before
     public void before() {
@@ -58,13 +60,13 @@ public class RemotingChannelManagerTest {
         String clientId = RandomStringUtils.randomAlphabetic(10);
 
         Channel producerChannel = createMockChannel();
-        RemotingChannel producerRemotingChannel = 
this.remotingChannelManager.createProducerChannel(producerChannel, group, 
clientId);
+        RemotingChannel producerRemotingChannel = 
this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group, 
clientId);
         assertNotNull(producerRemotingChannel);
-        assertSame(producerRemotingChannel, 
this.remotingChannelManager.createProducerChannel(producerChannel, group, 
clientId));
+        assertSame(producerRemotingChannel, 
this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group, 
clientId));
 
         Channel consumerChannel = createMockChannel();
-        RemotingChannel consumerRemotingChannel = 
this.remotingChannelManager.createConsumerChannel(consumerChannel, group, 
clientId, new HashSet<>());
-        assertSame(consumerRemotingChannel, 
this.remotingChannelManager.createConsumerChannel(consumerChannel, group, 
clientId, new HashSet<>()));
+        RemotingChannel consumerRemotingChannel = 
this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group, 
clientId, new HashSet<>());
+        assertSame(consumerRemotingChannel, 
this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group, 
clientId, new HashSet<>()));
         assertNotNull(consumerRemotingChannel);
 
         assertNotSame(producerRemotingChannel, consumerRemotingChannel);
@@ -77,14 +79,14 @@ public class RemotingChannelManagerTest {
 
         {
             Channel producerChannel = createMockChannel();
-            RemotingChannel producerRemotingChannel = 
this.remotingChannelManager.createProducerChannel(producerChannel, group, 
clientId);
-            assertSame(producerRemotingChannel, 
this.remotingChannelManager.removeProducerChannel(group, 
producerRemotingChannel));
+            RemotingChannel producerRemotingChannel = 
this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group, 
clientId);
+            assertSame(producerRemotingChannel, 
this.remotingChannelManager.removeProducerChannel(ctx, group, 
producerRemotingChannel));
             assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
         }
         {
             Channel producerChannel = createMockChannel();
-            RemotingChannel producerRemotingChannel = 
this.remotingChannelManager.createProducerChannel(producerChannel, group, 
clientId);
-            assertSame(producerRemotingChannel, 
this.remotingChannelManager.removeProducerChannel(group, producerChannel));
+            RemotingChannel producerRemotingChannel = 
this.remotingChannelManager.createProducerChannel(ctx, producerChannel, group, 
clientId);
+            assertSame(producerRemotingChannel, 
this.remotingChannelManager.removeProducerChannel(ctx, group, producerChannel));
             assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
         }
     }
@@ -96,14 +98,14 @@ public class RemotingChannelManagerTest {
 
         {
             Channel consumerChannel = createMockChannel();
-            RemotingChannel consumerRemotingChannel = 
this.remotingChannelManager.createConsumerChannel(consumerChannel, group, 
clientId, new HashSet<>());
-            assertSame(consumerRemotingChannel, 
this.remotingChannelManager.removeConsumerChannel(group, 
consumerRemotingChannel));
+            RemotingChannel consumerRemotingChannel = 
this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group, 
clientId, new HashSet<>());
+            assertSame(consumerRemotingChannel, 
this.remotingChannelManager.removeConsumerChannel(ctx, group, 
consumerRemotingChannel));
             assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
         }
         {
             Channel consumerChannel = createMockChannel();
-            RemotingChannel consumerRemotingChannel = 
this.remotingChannelManager.createConsumerChannel(consumerChannel, group, 
clientId, new HashSet<>());
-            assertSame(consumerRemotingChannel, 
this.remotingChannelManager.removeConsumerChannel(group, consumerChannel));
+            RemotingChannel consumerRemotingChannel = 
this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, group, 
clientId, new HashSet<>());
+            assertSame(consumerRemotingChannel, 
this.remotingChannelManager.removeConsumerChannel(ctx, group, consumerChannel));
             assertTrue(this.remotingChannelManager.groupChannelMap.isEmpty());
         }
     }
@@ -115,9 +117,9 @@ public class RemotingChannelManagerTest {
         String clientId = RandomStringUtils.randomAlphabetic(10);
 
         Channel consumerChannel = createMockChannel();
-        RemotingChannel consumerRemotingChannel = 
this.remotingChannelManager.createConsumerChannel(consumerChannel, 
consumerGroup, clientId, new HashSet<>());
+        RemotingChannel consumerRemotingChannel = 
this.remotingChannelManager.createConsumerChannel(ctx, consumerChannel, 
consumerGroup, clientId, new HashSet<>());
         Channel producerChannel = createMockChannel();
-        RemotingChannel producerRemotingChannel = 
this.remotingChannelManager.createProducerChannel(producerChannel, 
producerGroup, clientId);
+        RemotingChannel producerRemotingChannel = 
this.remotingChannelManager.createProducerChannel(ctx, producerChannel, 
producerGroup, clientId);
 
         assertSame(consumerRemotingChannel, 
this.remotingChannelManager.removeChannel(consumerChannel).stream().findFirst().get());
         assertSame(producerRemotingChannel, 
this.remotingChannelManager.removeChannel(producerChannel).stream().findFirst().get());
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java
index 02912446cf..6766564bc7 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/LockBatchRequestBody.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.remoting.protocol.body;
 
+import com.google.common.base.MoreObjects;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -59,4 +60,14 @@ public class LockBatchRequestBody extends 
RemotingSerializable {
     public void setMqSet(Set<MessageQueue> mqSet) {
         this.mqSet = mqSet;
     }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("consumerGroup", consumerGroup)
+            .add("clientId", clientId)
+            .add("onlyThisBroker", onlyThisBroker)
+            .add("mqSet", mqSet)
+            .toString();
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java
index fcac7ed9ae..2ad906739c 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/UnlockBatchRequestBody.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.remoting.protocol.body;
 
+import com.google.common.base.MoreObjects;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -59,4 +60,14 @@ public class UnlockBatchRequestBody extends 
RemotingSerializable {
     public void setMqSet(Set<MessageQueue> mqSet) {
         this.mqSet = mqSet;
     }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("consumerGroup", consumerGroup)
+            .add("clientId", clientId)
+            .add("onlyThisBroker", onlyThisBroker)
+            .add("mqSet", mqSet)
+            .toString();
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
index 5965e9dcbb..2ccf564df5 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.remoting.protocol.header;
 
+import com.google.common.base.MoreObjects;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
@@ -99,4 +100,17 @@ public class NotificationRequestHeader extends 
TopicQueueRequestHeader {
     public void setAttemptId(String attemptId) {
         this.attemptId = attemptId;
     }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("consumerGroup", consumerGroup)
+            .add("topic", topic)
+            .add("queueId", queueId)
+            .add("pollTime", pollTime)
+            .add("bornTime", bornTime)
+            .add("order", order)
+            .add("attemptId", attemptId)
+            .toString();
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java
index 39aaa01176..e16d38a7a3 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -20,6 +20,7 @@
  */
 package org.apache.rocketmq.remoting.protocol.header;
 
+import com.google.common.base.MoreObjects;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
@@ -73,4 +74,14 @@ public class QueryConsumerOffsetRequestHeader extends 
TopicQueueRequestHeader {
     public void setSetZeroIfNotFound(Boolean setZeroIfNotFound) {
         this.setZeroIfNotFound = setZeroIfNotFound;
     }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("consumerGroup", consumerGroup)
+            .add("topic", topic)
+            .add("queueId", queueId)
+            .add("setZeroIfNotFound", setZeroIfNotFound)
+            .toString();
+    }
 }

Reply via email to