This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3f99b1e96b [ISSUE #7707] Refector Context with link node 
implementation (#7708)
3f99b1e96b is described below

commit 3f99b1e96bedb0dc6854c92b2f753cdf9fa68197
Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com>
AuthorDate: Tue Jan 9 10:51:26 2024 +0800

    [ISSUE #7707] Refector Context with link node implementation (#7708)
    
    * Refector Context with link node implement
---
 .../apache/rocketmq/proxy/common/ProxyContext.java | 97 +++++++++++-----------
 .../rocketmq/proxy/common/context/ContextNode.java | 55 ++++++++++++
 .../common/{ => context}/ContextVariable.java      |  2 +-
 .../proxy/grpc/v2/GrpcMessagingApplication.java    | 16 ++--
 .../proxy/processor/ReceiptHandleProcessor.java    |  2 +-
 .../activity/AbstractRemotingActivity.java         | 36 ++++----
 .../rocketmq/proxy/service/relay/ProxyChannel.java |  4 +-
 .../org/apache/rocketmq/proxy/ContextNodeTest.java | 69 +++++++++++++++
 .../rocketmq/proxy/common/ProxyContextTest.java    | 48 +++++++++++
 .../rocketmq/proxy/grpc/v2/BaseActivityTest.java   | 12 +--
 .../grpc/v2/channel/GrpcClientChannelTest.java     |  2 +-
 .../v2/common/GrpcClientSettingsManagerTest.java   |  6 +-
 .../v2/consumer/ReceiveMessageActivityTest.java    | 12 ++-
 .../service/message/LocalMessageServiceTest.java   |  6 +-
 .../mqclient/ProxyClientRemotingProcessorTest.java |  2 +-
 .../receipt/DefaultReceiptHandleManagerTest.java   | 60 ++++++-------
 .../service/sysmessage/HeartbeatSyncerTest.java    |  4 +-
 17 files changed, 304 insertions(+), 129 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
index 77a6791f04..3e602d5ad0 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
@@ -18,117 +18,118 @@
 package org.apache.rocketmq.proxy.common;
 
 import io.netty.channel.Channel;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.rocketmq.proxy.common.context.ContextNode;
+import org.apache.rocketmq.proxy.common.context.ContextVariable;
 
 public class ProxyContext {
     public static final String INNER_ACTION_PREFIX = "Inner";
-    private final Map<String, Object> value = new HashMap<>();
+    private final ContextNode contextNode;
+
+    ProxyContext() {
+        this.contextNode = new ContextNode();
+    }
+
+    ProxyContext(ContextNode parent) {
+        this.contextNode = parent;
+    }
+
+    ProxyContext(ProxyContext that) {
+        this.contextNode = that.contextNode;
+    }
 
     public static ProxyContext create() {
         return new ProxyContext();
     }
 
     public static ProxyContext createForInner(String actionName) {
-        return create().setAction(INNER_ACTION_PREFIX + actionName);
+        return create().withAction(INNER_ACTION_PREFIX + actionName);
     }
 
     public static ProxyContext createForInner(Class<?> clazz) {
         return createForInner(clazz.getSimpleName());
     }
 
-    public Map<String, Object> getValue() {
-        return this.value;
+    public ProxyContext withValue(String key, Object val) {
+        return new ProxyContext(contextNode.withValue(key, val));
     }
 
-    public ProxyContext withVal(String key, Object val) {
-        this.value.put(key, val);
-        return this;
+    public <T> T getValue(String key) {
+        return (T) contextNode.getValue(key);
     }
 
-    public <T> T getVal(String key) {
-        return (T) this.value.get(key);
+    public <T> T getValue(String key, Class<T> classType) {
+        return (T) contextNode.getValue(key, classType);
     }
 
-    public ProxyContext setLocalAddress(String localAddress) {
-        this.withVal(ContextVariable.LOCAL_ADDRESS, localAddress);
-        return this;
+    public ProxyContext withLocalAddress(String localAddress) {
+        return this.withValue(ContextVariable.LOCAL_ADDRESS, localAddress);
     }
 
     public String getLocalAddress() {
-        return this.getVal(ContextVariable.LOCAL_ADDRESS);
+        return contextNode.getValue(ContextVariable.LOCAL_ADDRESS, 
String.class);
     }
 
-    public ProxyContext setRemoteAddress(String remoteAddress) {
-        this.withVal(ContextVariable.REMOTE_ADDRESS, remoteAddress);
-        return this;
+    public ProxyContext withRemoteAddress(String remoteAddress) {
+        return this.withValue(ContextVariable.REMOTE_ADDRESS, remoteAddress);
     }
 
     public String getRemoteAddress() {
-        return this.getVal(ContextVariable.REMOTE_ADDRESS);
+        return contextNode.getValue(ContextVariable.REMOTE_ADDRESS, 
String.class);
     }
 
-    public ProxyContext setClientID(String clientID) {
-        this.withVal(ContextVariable.CLIENT_ID, clientID);
-        return this;
+    public ProxyContext withClientID(String clientID) {
+        return this.withValue(ContextVariable.CLIENT_ID, clientID);
     }
 
     public String getClientID() {
-        return this.getVal(ContextVariable.CLIENT_ID);
+        return contextNode.getValue(ContextVariable.CLIENT_ID, String.class);
     }
 
-    public ProxyContext setChannel(Channel channel) {
-        this.withVal(ContextVariable.CHANNEL, channel);
-        return this;
+    public ProxyContext withChannel(Channel channel) {
+        return this.withValue(ContextVariable.CHANNEL, channel);
     }
 
     public Channel getChannel() {
-        return this.getVal(ContextVariable.CHANNEL);
+        return contextNode.getValue(ContextVariable.CHANNEL, Channel.class);
     }
 
-    public ProxyContext setLanguage(String language) {
-        this.withVal(ContextVariable.LANGUAGE, language);
-        return this;
+    public ProxyContext withLanguage(String language) {
+        return this.withValue(ContextVariable.LANGUAGE, language);
     }
 
     public String getLanguage() {
-        return this.getVal(ContextVariable.LANGUAGE);
+        return contextNode.getValue(ContextVariable.LANGUAGE, String.class);
     }
 
-    public ProxyContext setClientVersion(String clientVersion) {
-        this.withVal(ContextVariable.CLIENT_VERSION, clientVersion);
-        return this;
+    public ProxyContext withClientVersion(String clientVersion) {
+        return this.withValue(ContextVariable.CLIENT_VERSION, clientVersion);
     }
 
     public String getClientVersion() {
-        return this.getVal(ContextVariable.CLIENT_VERSION);
+        return contextNode.getValue(ContextVariable.CLIENT_VERSION, 
String.class);
     }
 
-    public ProxyContext setRemainingMs(Long remainingMs) {
-        this.withVal(ContextVariable.REMAINING_MS, remainingMs);
-        return this;
+    public ProxyContext withRemainingMs(Long remainingMs) {
+        return this.withValue(ContextVariable.REMAINING_MS, remainingMs);
     }
 
     public Long getRemainingMs() {
-        return this.getVal(ContextVariable.REMAINING_MS);
+        return contextNode.getValue(ContextVariable.REMAINING_MS, Long.class);
     }
 
-    public ProxyContext setAction(String action) {
-        this.withVal(ContextVariable.ACTION, action);
-        return this;
+    public ProxyContext withAction(String action) {
+        return this.withValue(ContextVariable.ACTION, action);
     }
 
     public String getAction() {
-        return this.getVal(ContextVariable.ACTION);
+        return contextNode.getValue(ContextVariable.ACTION, String.class);
     }
 
-    public ProxyContext setProtocolType(String protocol) {
-        this.withVal(ContextVariable.PROTOCOL_TYPE, protocol);
-        return this;
+    public ProxyContext withProtocolType(String protocol) {
+        return this.withValue(ContextVariable.PROTOCOL_TYPE, protocol);
     }
 
     public String getProtocolType() {
-        return this.getVal(ContextVariable.PROTOCOL_TYPE);
+        return contextNode.getValue(ContextVariable.PROTOCOL_TYPE, 
String.class);
     }
-
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java
new file mode 100644
index 0000000000..7b418516b0
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common.context;
+
+public class ContextNode {
+    private final String key;
+    private final Object value;
+    private final ContextNode parent;
+
+    public ContextNode() {
+        this(null, null, null);
+    }
+
+    public ContextNode(ContextNode parent, String key, Object value) {
+        this.parent = parent;
+        this.key = key;
+        this.value = value;
+    }
+
+    public ContextNode withValue(String key, Object value) {
+        return new ContextNode(this, key, value);
+    }
+
+    public Object getValue(String key) {
+        for (ContextNode current = this; current != null; current = 
current.parent) {
+            if (key.equals(current.key)) {
+                return current.value;
+            }
+        }
+        return null;
+    }
+
+    public <T> T getValue(String key, Class<T> classType) {
+        Object value = getValue(key);
+        if (classType.isInstance(value)) {
+            return classType.cast(value);
+        }
+        return null;
+    }
+}
\ No newline at end of file
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java
similarity index 96%
rename from 
proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
rename to 
proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java
index 0760826de7..727f4c9bbc 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.proxy.common;
+package org.apache.rocketmq.proxy.common.context;
 
 public class ContextVariable {
     public static final String REMOTE_ADDRESS = "remote-address";
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
index 2cb395ad60..3cd664ec55 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
@@ -169,15 +169,15 @@ public class GrpcMessagingApplication extends 
MessagingServiceGrpc.MessagingServ
         Context ctx = Context.current();
         Metadata headers = InterceptorConstants.METADATA.get(ctx);
         ProxyContext context = ProxyContext.create()
-            .setLocalAddress(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.LOCAL_ADDRESS))
-            .setRemoteAddress(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.REMOTE_ADDRESS))
-            .setClientID(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.CLIENT_ID))
-            .setProtocolType(ChannelProtocolType.GRPC_V2.getName())
-            .setLanguage(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.LANGUAGE))
-            .setClientVersion(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.CLIENT_VERSION))
-            .setAction(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.SIMPLE_RPC_NAME));
+            .withLocalAddress(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.LOCAL_ADDRESS))
+            .withRemoteAddress(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.REMOTE_ADDRESS))
+            .withClientID(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.CLIENT_ID))
+            .withProtocolType(ChannelProtocolType.GRPC_V2.getName())
+            .withLanguage(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.LANGUAGE))
+            .withClientVersion(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.CLIENT_VERSION))
+            .withAction(getDefaultStringMetadataInfo(headers, 
InterceptorConstants.SIMPLE_RPC_NAME));
         if (ctx.getDeadline() != null) {
-            
context.setRemainingMs(ctx.getDeadline().timeRemaining(TimeUnit.MILLISECONDS));
+            context = 
context.withRemainingMs(ctx.getDeadline().timeRemaining(TimeUnit.MILLISECONDS));
         }
         return context;
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 5e1be93218..71ebfe8af1 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -37,7 +37,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor 
{
         super(messagingProcessor, serviceManager);
         StateEventListener<RenewEvent> eventListener = event -> {
             ProxyContext context = createContext(event.getEventType().name())
-                .setChannel(event.getKey().getChannel());
+                .withChannel(event.getKey().getChannel());
             MessageReceiptHandle messageReceiptHandle = 
event.getMessageReceiptHandle();
             ReceiptHandle handle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
             messagingProcessor.changeInvisibleTime(context, handle, 
messageReceiptHandle.getMessageId(),
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
index ce4a633976..73779eaaf0 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.proxy.remoting.activity;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.rocketmq.acl.common.AclException;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -40,14 +42,11 @@ import 
org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.netty.AttributeKeys;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
 public abstract class AbstractRemotingActivity implements 
NettyRequestProcessor {
     protected final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     protected final MessagingProcessor messagingProcessor;
@@ -124,18 +123,23 @@ public abstract class AbstractRemotingActivity implements 
NettyRequestProcessor
     protected ProxyContext createContext(ChannelHandlerContext ctx, 
RemotingCommand request) {
         ProxyContext context = ProxyContext.create();
         Channel channel = ctx.channel();
-        context.setAction(RemotingHelper.getRequestCodeDesc(request.getCode()))
-            .setProtocolType(ChannelProtocolType.REMOTING.getName())
-            .setChannel(channel)
-            
.setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress()))
-            
.setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-
-        
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY,
 channel))
-            .ifPresent(language -> context.setLanguage(language.name()));
-        
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY,
 channel))
-            .ifPresent(context::setClientID);
-        
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, 
channel))
-            .ifPresent(version -> 
context.setClientVersion(MQVersion.getVersionDesc(version)));
+        LanguageCode languageCode = 
RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY, channel);
+        String clientId = 
RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY, channel);
+        Integer version = 
RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, channel);
+        context = 
context.withAction(RemotingHelper.getRequestCodeDesc(request.getCode()))
+            .withProtocolType(ChannelProtocolType.REMOTING.getName())
+            .withChannel(channel)
+            
.withLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress()))
+            
.withRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+        if (languageCode != null) {
+            context = context.withLanguage(languageCode.name());
+        }
+        if (clientId != null) {
+            context = context.withClientID(clientId);
+        }
+        if (version != null) {
+            context = 
context.withClientVersion(MQVersion.getVersionDesc(version));
+        }
 
         return context;
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java
index 5a1185a81e..277b8f1586 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java
@@ -77,8 +77,8 @@ public abstract class ProxyChannel extends SimpleChannel {
         try {
             if (msg instanceof RemotingCommand) {
                 ProxyContext context = 
ProxyContext.createForInner(this.getClass())
-                    .setRemoteAddress(remoteAddress)
-                    .setLocalAddress(localAddress);
+                    .withRemoteAddress(remoteAddress)
+                    .withLocalAddress(localAddress);
                 RemotingCommand command = (RemotingCommand) msg;
                 if (command.getExtFields() == null) {
                     command.setExtFields(new HashMap<>());
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java 
b/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java
new file mode 100644
index 0000000000..19cf179c3d
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy;
+
+import org.apache.rocketmq.proxy.common.context.ContextNode;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ContextNodeTest {
+    private ContextNode contextNode;
+
+    @Test
+    public void testWithValue() {
+        String key = "key";
+        String value = "value";
+        contextNode = new ContextNode();
+        ContextNode newContextNode = contextNode.withValue(key, value);
+        assertThat(newContextNode.getValue(key, 
String.class)).isEqualTo(value);
+        assertThat(newContextNode.getValue(key)).isEqualTo(value);
+
+        assertThat(contextNode.getValue(key, String.class)).isNull();
+    }
+
+    @Test
+    public void testRepeatedKeyForTwoContext() {
+        String key1 = "key1";
+        String value1 = "value1";
+        String value2 = "value2";
+        contextNode = new ContextNode();
+        ContextNode newContextNode1 = contextNode.withValue(key1, value1);
+        ContextNode newContextNode2 = contextNode.withValue(key1, value2);
+        assertThat(newContextNode1.getValue(key1, 
String.class)).isEqualTo(value1);
+        assertThat(newContextNode1.getValue(key1)).isEqualTo(value1);
+        assertThat(newContextNode2.getValue(key1, 
String.class)).isEqualTo(value2);
+        assertThat(newContextNode2.getValue(key1)).isEqualTo(value2);
+
+        assertThat(contextNode.getValue(key1, String.class)).isNull();
+    }
+
+    @Test
+    public void testRepeatedKeyForContextChain() {
+        String key1 = "key1";
+        String value1 = "value1";
+        String value2 = "value2";
+        contextNode = new ContextNode();
+        ContextNode newContextNode1 = contextNode.withValue(key1, value1);
+        ContextNode newContextNode2 = newContextNode1.withValue(key1, value2);
+        assertThat(newContextNode1.getValue(key1, 
String.class)).isEqualTo(value1);
+        assertThat(newContextNode2.getValue(key1, 
String.class)).isEqualTo(value2);
+
+        assertThat(contextNode.getValue(key1, String.class)).isNull();
+    }
+}
\ No newline at end of file
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java 
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java
new file mode 100644
index 0000000000..0999440cde
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ProxyContextTest {
+    private ProxyContext proxyContext;
+
+    @Test
+    public void testWithValue() {
+        String key = "key";
+        String value = "value";
+        proxyContext = ProxyContext.create();
+        ProxyContext newContext = proxyContext.withValue(key, value);
+        assertThat(newContext.getValue(key, String.class)).isEqualTo(value);
+        String actualValue = newContext.getValue(key);
+        assertThat(actualValue).isEqualTo(value);
+
+        assertThat(proxyContext.getValue(key, String.class)).isNull();
+    }
+
+    @Test
+    public void testSetLocalAddress() {
+        String address = "address";
+        proxyContext = ProxyContext.create();
+        ProxyContext newProxyContext = proxyContext.withLocalAddress(address);
+        assertThat(proxyContext.getLocalAddress()).isNull();
+        assertThat(newProxyContext.getLocalAddress()).isEqualTo(address);
+    }
+}
\ No newline at end of file
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java
index 524945bd6f..f29d59fe4c 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java
@@ -21,7 +21,7 @@ import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.Random;
 import java.util.UUID;
-import org.apache.rocketmq.proxy.common.ContextVariable;
+import org.apache.rocketmq.proxy.common.context.ContextVariable;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.InitConfigTest;
 import org.apache.rocketmq.proxy.grpc.interceptor.InterceptorConstants;
@@ -76,11 +76,11 @@ public class BaseActivityTest extends InitConfigTest {
 
     protected ProxyContext createContext() {
         return ProxyContext.create()
-            .withVal(ContextVariable.CLIENT_ID, CLIENT_ID)
-            .withVal(ContextVariable.LANGUAGE, JAVA)
-            .withVal(ContextVariable.REMOTE_ADDRESS, REMOTE_ADDR)
-            .withVal(ContextVariable.LOCAL_ADDRESS, LOCAL_ADDR)
-            .withVal(ContextVariable.REMAINING_MS, 
Duration.ofSeconds(10).toMillis());
+            .withValue(ContextVariable.CLIENT_ID, CLIENT_ID)
+            .withValue(ContextVariable.LANGUAGE, JAVA)
+            .withValue(ContextVariable.REMOTE_ADDRESS, REMOTE_ADDR)
+            .withValue(ContextVariable.LOCAL_ADDRESS, LOCAL_ADDR)
+            .withValue(ContextVariable.REMAINING_MS, 
Duration.ofSeconds(10).toMillis());
     }
 
     protected static String buildReceiptHandle(String topic, long popTime, 
long invisibleTime) {
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
index 1bdbdd9bef..af5e3e10dc 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
@@ -58,7 +58,7 @@ public class GrpcClientChannelTest extends InitConfigTest {
         super.before();
         this.clientId = RandomStringUtils.randomAlphabetic(10);
         this.grpcClientChannel = new GrpcClientChannel(proxyRelayService, 
grpcClientSettingsManager, grpcChannelManager,
-            
ProxyContext.create().setRemoteAddress("10.152.39.53:9768").setLocalAddress("11.193.0.1:1210"),
+            
ProxyContext.create().withRemoteAddress("10.152.39.53:9768").withLocalAddress("11.193.0.1:1210"),
             this.clientId);
     }
 
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
index 6742f094c8..3c3f5bf28f 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
@@ -25,7 +25,7 @@ import apache.rocketmq.v2.RetryPolicy;
 import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.Subscription;
 import com.google.protobuf.util.Durations;
-import org.apache.rocketmq.proxy.common.ContextVariable;
+import org.apache.rocketmq.proxy.common.context.ContextVariable;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
 import 
org.apache.rocketmq.remoting.protocol.subscription.CustomizedRetryPolicy;
@@ -52,7 +52,7 @@ public class GrpcClientSettingsManagerTest extends 
BaseActivityTest {
 
     @Test
     public void testGetProducerData() {
-        ProxyContext context = 
ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID);
+        ProxyContext context = 
ProxyContext.create().withValue(ContextVariable.CLIENT_ID, CLIENT_ID);
 
         this.grpcClientSettingsManager.updateClientSettings(context, 
CLIENT_ID, Settings.newBuilder()
             .setBackoffPolicy(RetryPolicy.getDefaultInstance())
@@ -65,7 +65,7 @@ public class GrpcClientSettingsManagerTest extends 
BaseActivityTest {
 
     @Test
     public void testGetSubscriptionData() {
-        ProxyContext context = 
ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID);
+        ProxyContext context = 
ProxyContext.create().withValue(ContextVariable.CLIENT_ID, CLIENT_ID);
 
         SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
         when(this.messagingProcessor.getSubscriptionGroupConfig(any(), any()))
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index 77ae5e4d11..70460a9419 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -94,9 +94,8 @@ public class ReceiveMessageActivityTest extends 
BaseActivityTest {
             .thenReturn(CompletableFuture.completedFuture(new 
PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())));
 
         ProxyContext context = createContext();
-        context.setRemainingMs(1L);
         this.receiveMessageActivity.receiveMessage(
-            context,
+            context.withRemainingMs(1L),
             ReceiveMessageRequest.newBuilder()
                 
.setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
                 
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
@@ -121,9 +120,9 @@ public class ReceiveMessageActivityTest extends 
BaseActivityTest {
 
         
when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType());
 
-        final ProxyContext context = createContext();
-        context.setClientVersion("5.0.2");
-        context.setRemainingMs(-1L);
+        final ProxyContext context = createContext()
+            .withClientVersion("5.0.2")
+            .withRemainingMs(-1L);
         final ReceiveMessageRequest request = 
ReceiveMessageRequest.newBuilder()
             .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
             
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
@@ -144,9 +143,8 @@ public class ReceiveMessageActivityTest extends 
BaseActivityTest {
         ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor1 =
             ArgumentCaptor.forClass(ReceiveMessageResponse.class);
         
doNothing().when(receiveStreamObserver).onNext(responseArgumentCaptor1.capture());
-        context.setClientVersion("5.0.3");
         this.receiveMessageActivity.receiveMessage(
-            context,
+            context.withClientVersion("5.0.3"),
             request,
             receiveStreamObserver
         );
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
index 84fc6499c0..51fea167d0 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
@@ -46,7 +46,7 @@ import 
org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.proxy.common.ContextVariable;
+import org.apache.rocketmq.proxy.common.context.ContextVariable;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
@@ -123,8 +123,8 @@ public class LocalMessageServiceTest extends InitConfigTest 
{
         
Mockito.when(brokerControllerMock.getEndTransactionProcessor()).thenReturn(endTransactionProcessorMock);
         Mockito.when(brokerControllerMock.getBrokerConfig()).thenReturn(new 
BrokerConfig());
         localMessageService = new LocalMessageService(brokerControllerMock, 
channelManager, null);
-        proxyContext = 
ProxyContext.create().withVal(ContextVariable.REMOTE_ADDRESS, "0.0.0.1")
-            .withVal(ContextVariable.LOCAL_ADDRESS, "0.0.0.2");
+        proxyContext = 
ProxyContext.create().withValue(ContextVariable.REMOTE_ADDRESS, "0.0.0.1")
+            .withValue(ContextVariable.LOCAL_ADDRESS, "0.0.0.2");
     }
 
     @Test
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
index a6d807937e..7ebad93722 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
@@ -79,7 +79,7 @@ public class ProxyClientRemotingProcessorTest {
                 proxyRelayResultFuture));
 
         GrpcClientChannel grpcClientChannel = new 
GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, null,
-            
ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"),
 "clientId");
+            
ProxyContext.create().withRemoteAddress("127.0.0.1:8888").withLocalAddress("127.0.0.1:10911"),
 "clientId");
         when(producerManager.getAvailableChannel(anyString()))
             .thenReturn(grpcClientChannel);
 
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
index 25ae1509a9..86a529178f 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
@@ -35,7 +35,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.state.StateEventListener;
 import org.apache.rocketmq.proxy.common.RenewEvent;
-import org.apache.rocketmq.proxy.common.ContextVariable;
+import org.apache.rocketmq.proxy.common.context.ContextVariable;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.ProxyException;
@@ -71,7 +71,7 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
     @Mock
     protected ConsumerManager consumerManager;
 
-    private static final ProxyContext PROXY_CONTEXT = ProxyContext.create();
+    private static ProxyContext proxyContext = ProxyContext.create();
     private static final String GROUP = "group";
     private static final String TOPIC = "topic";
     private static final String BROKER_NAME = "broker";
@@ -92,7 +92,7 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
             public void fireEvent(RenewEvent event) {
                 MessageReceiptHandle messageReceiptHandle = 
event.getMessageReceiptHandle();
                 ReceiptHandle handle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
-                messagingProcessor.changeInvisibleTime(PROXY_CONTEXT, handle, 
messageReceiptHandle.getMessageId(),
+                messagingProcessor.changeInvisibleTime(proxyContext, handle, 
messageReceiptHandle.getMessageId(),
                         messageReceiptHandle.getGroup(), 
messageReceiptHandle.getTopic(), event.getRenewTime())
                     .whenComplete((v, t) -> {
                         if (t != null) {
@@ -115,8 +115,8 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
             .offset(OFFSET)
             .commitLogOffset(0L)
             .build().encode();
-        PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id");
-        PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new LocalChannel());
+        proxyContext = proxyContext.withValue(ContextVariable.CLIENT_ID, 
"channel-id");
+        proxyContext = proxyContext.withValue(ContextVariable.CHANNEL, new 
LocalChannel());
         
Mockito.doNothing().when(consumerManager).appendConsumerIdsChangeListener(Mockito.any(ConsumerIdsChangeListener.class));
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
@@ -125,7 +125,7 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
     @Test
     public void testAddReceiptHandle() {
         Channel channel = new LocalChannel();
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         receiptHandleManager.scheduleRenewTask();
@@ -137,7 +137,7 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
     @Test
     public void testAddDuplicationMessage() {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
         {
             String receiptHandle = ReceiptHandle.builder()
                 .startOffset(0L)
@@ -152,9 +152,9 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
                 .build().encode();
             MessageReceiptHandle messageReceiptHandle = new 
MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
                 RECONSUME_TIMES);
-            receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, 
GROUP, MSG_ID, messageReceiptHandle);
+            receiptHandleManager.addReceiptHandle(proxyContext, channel, 
GROUP, MSG_ID, messageReceiptHandle);
         }
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         receiptHandleManager.scheduleRenewTask();
@@ -169,8 +169,8 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
     @Test
     public void testRenewReceiptHandle() {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
@@ -214,9 +214,9 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
 
     @Test
     public void testRenewExceedMaxRenewTimes() {
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
 
         CompletableFuture<AckResult> ackResultFuture = new 
CompletableFuture<>();
         ackResultFuture.completeExceptionally(new MQClientException(0, 
"error"));
@@ -244,9 +244,9 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
 
     @Test
     public void testRenewWithInvalidHandle() {
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
 
         CompletableFuture<AckResult> ackResultFuture = new 
CompletableFuture<>();
         ackResultFuture.completeExceptionally(new 
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
@@ -268,9 +268,9 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
     @Test
     public void testRenewWithErrorThenOK() {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
 
         AtomicInteger count = new AtomicInteger(0);
         List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
@@ -347,8 +347,8 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
             .build().encode();
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
@@ -381,8 +381,8 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
             .build().encode();
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(null);
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), 
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), 
Mockito.anyLong()))
@@ -417,8 +417,8 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
             .build().encode();
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
@@ -430,9 +430,9 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
 
     @Test
     public void testRemoveReceiptHandle() {
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
-        receiptHandleManager.removeReceiptHandle(PROXY_CONTEXT, channel, 
GROUP, MSG_ID, receiptHandle);
+        Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        receiptHandleManager.removeReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, receiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         receiptHandleManager.scheduleRenewTask();
@@ -443,8 +443,8 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
 
     @Test
     public void testClearGroup() {
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         receiptHandleManager.clearGroup(new ReceiptHandleGroupKey(channel, 
GROUP));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
@@ -458,8 +458,8 @@ public class DefaultReceiptHandleManagerTest extends 
BaseServiceTest {
     public void testClientOffline() {
         ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = 
ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
         Mockito.verify(consumerManager, 
Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture());
-        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, messageReceiptHandle);
+        Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
+        receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, 
MSG_ID, messageReceiptHandle);
         
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, 
GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
         assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty());
     }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 9a2c5e3437..7e4df145df 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -146,7 +146,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
         GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class);
         GrpcClientChannel grpcClientChannel = new GrpcClientChannel(
             proxyRelayService, grpcClientSettingsManager, grpcChannelManager,
-            
ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress),
+            
ProxyContext.create().withRemoteAddress(remoteAddress).withLocalAddress(localAddress),
             clientId);
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
             grpcClientChannel,
@@ -345,7 +345,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
         GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class);
         GrpcClientChannel grpcClientChannel = new GrpcClientChannel(
             proxyRelayService, grpcClientSettingsManager, grpcChannelManager,
-            
ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress),
+            
ProxyContext.create().withRemoteAddress(remoteAddress).withLocalAddress(localAddress),
             clientId);
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
             grpcClientChannel,

Reply via email to