This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 3f99b1e96b [ISSUE #7707] Refector Context with link node implementation (#7708) 3f99b1e96b is described below commit 3f99b1e96bedb0dc6854c92b2f753cdf9fa68197 Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com> AuthorDate: Tue Jan 9 10:51:26 2024 +0800 [ISSUE #7707] Refector Context with link node implementation (#7708) * Refector Context with link node implement --- .../apache/rocketmq/proxy/common/ProxyContext.java | 97 +++++++++++----------- .../rocketmq/proxy/common/context/ContextNode.java | 55 ++++++++++++ .../common/{ => context}/ContextVariable.java | 2 +- .../proxy/grpc/v2/GrpcMessagingApplication.java | 16 ++-- .../proxy/processor/ReceiptHandleProcessor.java | 2 +- .../activity/AbstractRemotingActivity.java | 36 ++++---- .../rocketmq/proxy/service/relay/ProxyChannel.java | 4 +- .../org/apache/rocketmq/proxy/ContextNodeTest.java | 69 +++++++++++++++ .../rocketmq/proxy/common/ProxyContextTest.java | 48 +++++++++++ .../rocketmq/proxy/grpc/v2/BaseActivityTest.java | 12 +-- .../grpc/v2/channel/GrpcClientChannelTest.java | 2 +- .../v2/common/GrpcClientSettingsManagerTest.java | 6 +- .../v2/consumer/ReceiveMessageActivityTest.java | 12 ++- .../service/message/LocalMessageServiceTest.java | 6 +- .../mqclient/ProxyClientRemotingProcessorTest.java | 2 +- .../receipt/DefaultReceiptHandleManagerTest.java | 60 ++++++------- .../service/sysmessage/HeartbeatSyncerTest.java | 4 +- 17 files changed, 304 insertions(+), 129 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java index 77a6791f04..3e602d5ad0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java @@ -18,117 +18,118 @@ package org.apache.rocketmq.proxy.common; import io.netty.channel.Channel; -import java.util.HashMap; -import java.util.Map; +import org.apache.rocketmq.proxy.common.context.ContextNode; +import org.apache.rocketmq.proxy.common.context.ContextVariable; public class ProxyContext { public static final String INNER_ACTION_PREFIX = "Inner"; - private final Map<String, Object> value = new HashMap<>(); + private final ContextNode contextNode; + + ProxyContext() { + this.contextNode = new ContextNode(); + } + + ProxyContext(ContextNode parent) { + this.contextNode = parent; + } + + ProxyContext(ProxyContext that) { + this.contextNode = that.contextNode; + } public static ProxyContext create() { return new ProxyContext(); } public static ProxyContext createForInner(String actionName) { - return create().setAction(INNER_ACTION_PREFIX + actionName); + return create().withAction(INNER_ACTION_PREFIX + actionName); } public static ProxyContext createForInner(Class<?> clazz) { return createForInner(clazz.getSimpleName()); } - public Map<String, Object> getValue() { - return this.value; + public ProxyContext withValue(String key, Object val) { + return new ProxyContext(contextNode.withValue(key, val)); } - public ProxyContext withVal(String key, Object val) { - this.value.put(key, val); - return this; + public <T> T getValue(String key) { + return (T) contextNode.getValue(key); } - public <T> T getVal(String key) { - return (T) this.value.get(key); + public <T> T getValue(String key, Class<T> classType) { + return (T) contextNode.getValue(key, classType); } - public ProxyContext setLocalAddress(String localAddress) { - this.withVal(ContextVariable.LOCAL_ADDRESS, localAddress); - return this; + public ProxyContext withLocalAddress(String localAddress) { + return this.withValue(ContextVariable.LOCAL_ADDRESS, localAddress); } public String getLocalAddress() { - return this.getVal(ContextVariable.LOCAL_ADDRESS); + return contextNode.getValue(ContextVariable.LOCAL_ADDRESS, String.class); } - public ProxyContext setRemoteAddress(String remoteAddress) { - this.withVal(ContextVariable.REMOTE_ADDRESS, remoteAddress); - return this; + public ProxyContext withRemoteAddress(String remoteAddress) { + return this.withValue(ContextVariable.REMOTE_ADDRESS, remoteAddress); } public String getRemoteAddress() { - return this.getVal(ContextVariable.REMOTE_ADDRESS); + return contextNode.getValue(ContextVariable.REMOTE_ADDRESS, String.class); } - public ProxyContext setClientID(String clientID) { - this.withVal(ContextVariable.CLIENT_ID, clientID); - return this; + public ProxyContext withClientID(String clientID) { + return this.withValue(ContextVariable.CLIENT_ID, clientID); } public String getClientID() { - return this.getVal(ContextVariable.CLIENT_ID); + return contextNode.getValue(ContextVariable.CLIENT_ID, String.class); } - public ProxyContext setChannel(Channel channel) { - this.withVal(ContextVariable.CHANNEL, channel); - return this; + public ProxyContext withChannel(Channel channel) { + return this.withValue(ContextVariable.CHANNEL, channel); } public Channel getChannel() { - return this.getVal(ContextVariable.CHANNEL); + return contextNode.getValue(ContextVariable.CHANNEL, Channel.class); } - public ProxyContext setLanguage(String language) { - this.withVal(ContextVariable.LANGUAGE, language); - return this; + public ProxyContext withLanguage(String language) { + return this.withValue(ContextVariable.LANGUAGE, language); } public String getLanguage() { - return this.getVal(ContextVariable.LANGUAGE); + return contextNode.getValue(ContextVariable.LANGUAGE, String.class); } - public ProxyContext setClientVersion(String clientVersion) { - this.withVal(ContextVariable.CLIENT_VERSION, clientVersion); - return this; + public ProxyContext withClientVersion(String clientVersion) { + return this.withValue(ContextVariable.CLIENT_VERSION, clientVersion); } public String getClientVersion() { - return this.getVal(ContextVariable.CLIENT_VERSION); + return contextNode.getValue(ContextVariable.CLIENT_VERSION, String.class); } - public ProxyContext setRemainingMs(Long remainingMs) { - this.withVal(ContextVariable.REMAINING_MS, remainingMs); - return this; + public ProxyContext withRemainingMs(Long remainingMs) { + return this.withValue(ContextVariable.REMAINING_MS, remainingMs); } public Long getRemainingMs() { - return this.getVal(ContextVariable.REMAINING_MS); + return contextNode.getValue(ContextVariable.REMAINING_MS, Long.class); } - public ProxyContext setAction(String action) { - this.withVal(ContextVariable.ACTION, action); - return this; + public ProxyContext withAction(String action) { + return this.withValue(ContextVariable.ACTION, action); } public String getAction() { - return this.getVal(ContextVariable.ACTION); + return contextNode.getValue(ContextVariable.ACTION, String.class); } - public ProxyContext setProtocolType(String protocol) { - this.withVal(ContextVariable.PROTOCOL_TYPE, protocol); - return this; + public ProxyContext withProtocolType(String protocol) { + return this.withValue(ContextVariable.PROTOCOL_TYPE, protocol); } public String getProtocolType() { - return this.getVal(ContextVariable.PROTOCOL_TYPE); + return contextNode.getValue(ContextVariable.PROTOCOL_TYPE, String.class); } - } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java new file mode 100644 index 0000000000..7b418516b0 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.common.context; + +public class ContextNode { + private final String key; + private final Object value; + private final ContextNode parent; + + public ContextNode() { + this(null, null, null); + } + + public ContextNode(ContextNode parent, String key, Object value) { + this.parent = parent; + this.key = key; + this.value = value; + } + + public ContextNode withValue(String key, Object value) { + return new ContextNode(this, key, value); + } + + public Object getValue(String key) { + for (ContextNode current = this; current != null; current = current.parent) { + if (key.equals(current.key)) { + return current.value; + } + } + return null; + } + + public <T> T getValue(String key, Class<T> classType) { + Object value = getValue(key); + if (classType.isInstance(value)) { + return classType.cast(value); + } + return null; + } +} \ No newline at end of file diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java similarity index 96% rename from proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java rename to proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java index 0760826de7..727f4c9bbc 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.proxy.common; +package org.apache.rocketmq.proxy.common.context; public class ContextVariable { public static final String REMOTE_ADDRESS = "remote-address"; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java index 2cb395ad60..3cd664ec55 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java @@ -169,15 +169,15 @@ public class GrpcMessagingApplication extends MessagingServiceGrpc.MessagingServ Context ctx = Context.current(); Metadata headers = InterceptorConstants.METADATA.get(ctx); ProxyContext context = ProxyContext.create() - .setLocalAddress(getDefaultStringMetadataInfo(headers, InterceptorConstants.LOCAL_ADDRESS)) - .setRemoteAddress(getDefaultStringMetadataInfo(headers, InterceptorConstants.REMOTE_ADDRESS)) - .setClientID(getDefaultStringMetadataInfo(headers, InterceptorConstants.CLIENT_ID)) - .setProtocolType(ChannelProtocolType.GRPC_V2.getName()) - .setLanguage(getDefaultStringMetadataInfo(headers, InterceptorConstants.LANGUAGE)) - .setClientVersion(getDefaultStringMetadataInfo(headers, InterceptorConstants.CLIENT_VERSION)) - .setAction(getDefaultStringMetadataInfo(headers, InterceptorConstants.SIMPLE_RPC_NAME)); + .withLocalAddress(getDefaultStringMetadataInfo(headers, InterceptorConstants.LOCAL_ADDRESS)) + .withRemoteAddress(getDefaultStringMetadataInfo(headers, InterceptorConstants.REMOTE_ADDRESS)) + .withClientID(getDefaultStringMetadataInfo(headers, InterceptorConstants.CLIENT_ID)) + .withProtocolType(ChannelProtocolType.GRPC_V2.getName()) + .withLanguage(getDefaultStringMetadataInfo(headers, InterceptorConstants.LANGUAGE)) + .withClientVersion(getDefaultStringMetadataInfo(headers, InterceptorConstants.CLIENT_VERSION)) + .withAction(getDefaultStringMetadataInfo(headers, InterceptorConstants.SIMPLE_RPC_NAME)); if (ctx.getDeadline() != null) { - context.setRemainingMs(ctx.getDeadline().timeRemaining(TimeUnit.MILLISECONDS)); + context = context.withRemainingMs(ctx.getDeadline().timeRemaining(TimeUnit.MILLISECONDS)); } return context; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index 5e1be93218..71ebfe8af1 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -37,7 +37,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor { super(messagingProcessor, serviceManager); StateEventListener<RenewEvent> eventListener = event -> { ProxyContext context = createContext(event.getEventType().name()) - .setChannel(event.getKey().getChannel()); + .withChannel(event.getKey().getChannel()); MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java index ce4a633976..73779eaaf0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.proxy.remoting.activity; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import java.util.HashMap; +import java.util.Map; import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -40,14 +42,11 @@ import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.netty.AttributeKeys; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - public abstract class AbstractRemotingActivity implements NettyRequestProcessor { protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected final MessagingProcessor messagingProcessor; @@ -124,18 +123,23 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor protected ProxyContext createContext(ChannelHandlerContext ctx, RemotingCommand request) { ProxyContext context = ProxyContext.create(); Channel channel = ctx.channel(); - context.setAction(RemotingHelper.getRequestCodeDesc(request.getCode())) - .setProtocolType(ChannelProtocolType.REMOTING.getName()) - .setChannel(channel) - .setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress())) - .setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - - Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY, channel)) - .ifPresent(language -> context.setLanguage(language.name())); - Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY, channel)) - .ifPresent(context::setClientID); - Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, channel)) - .ifPresent(version -> context.setClientVersion(MQVersion.getVersionDesc(version))); + LanguageCode languageCode = RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY, channel); + String clientId = RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY, channel); + Integer version = RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, channel); + context = context.withAction(RemotingHelper.getRequestCodeDesc(request.getCode())) + .withProtocolType(ChannelProtocolType.REMOTING.getName()) + .withChannel(channel) + .withLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress())) + .withRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + if (languageCode != null) { + context = context.withLanguage(languageCode.name()); + } + if (clientId != null) { + context = context.withClientID(clientId); + } + if (version != null) { + context = context.withClientVersion(MQVersion.getVersionDesc(version)); + } return context; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java index 5a1185a81e..277b8f1586 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java @@ -77,8 +77,8 @@ public abstract class ProxyChannel extends SimpleChannel { try { if (msg instanceof RemotingCommand) { ProxyContext context = ProxyContext.createForInner(this.getClass()) - .setRemoteAddress(remoteAddress) - .setLocalAddress(localAddress); + .withRemoteAddress(remoteAddress) + .withLocalAddress(localAddress); RemotingCommand command = (RemotingCommand) msg; if (command.getExtFields() == null) { command.setExtFields(new HashMap<>()); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java new file mode 100644 index 0000000000..19cf179c3d --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy; + +import org.apache.rocketmq.proxy.common.context.ContextNode; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ContextNodeTest { + private ContextNode contextNode; + + @Test + public void testWithValue() { + String key = "key"; + String value = "value"; + contextNode = new ContextNode(); + ContextNode newContextNode = contextNode.withValue(key, value); + assertThat(newContextNode.getValue(key, String.class)).isEqualTo(value); + assertThat(newContextNode.getValue(key)).isEqualTo(value); + + assertThat(contextNode.getValue(key, String.class)).isNull(); + } + + @Test + public void testRepeatedKeyForTwoContext() { + String key1 = "key1"; + String value1 = "value1"; + String value2 = "value2"; + contextNode = new ContextNode(); + ContextNode newContextNode1 = contextNode.withValue(key1, value1); + ContextNode newContextNode2 = contextNode.withValue(key1, value2); + assertThat(newContextNode1.getValue(key1, String.class)).isEqualTo(value1); + assertThat(newContextNode1.getValue(key1)).isEqualTo(value1); + assertThat(newContextNode2.getValue(key1, String.class)).isEqualTo(value2); + assertThat(newContextNode2.getValue(key1)).isEqualTo(value2); + + assertThat(contextNode.getValue(key1, String.class)).isNull(); + } + + @Test + public void testRepeatedKeyForContextChain() { + String key1 = "key1"; + String value1 = "value1"; + String value2 = "value2"; + contextNode = new ContextNode(); + ContextNode newContextNode1 = contextNode.withValue(key1, value1); + ContextNode newContextNode2 = newContextNode1.withValue(key1, value2); + assertThat(newContextNode1.getValue(key1, String.class)).isEqualTo(value1); + assertThat(newContextNode2.getValue(key1, String.class)).isEqualTo(value2); + + assertThat(contextNode.getValue(key1, String.class)).isNull(); + } +} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java new file mode 100644 index 0000000000..0999440cde --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.common; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ProxyContextTest { + private ProxyContext proxyContext; + + @Test + public void testWithValue() { + String key = "key"; + String value = "value"; + proxyContext = ProxyContext.create(); + ProxyContext newContext = proxyContext.withValue(key, value); + assertThat(newContext.getValue(key, String.class)).isEqualTo(value); + String actualValue = newContext.getValue(key); + assertThat(actualValue).isEqualTo(value); + + assertThat(proxyContext.getValue(key, String.class)).isNull(); + } + + @Test + public void testSetLocalAddress() { + String address = "address"; + proxyContext = ProxyContext.create(); + ProxyContext newProxyContext = proxyContext.withLocalAddress(address); + assertThat(proxyContext.getLocalAddress()).isNull(); + assertThat(newProxyContext.getLocalAddress()).isEqualTo(address); + } +} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java index 524945bd6f..f29d59fe4c 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java @@ -21,7 +21,7 @@ import io.grpc.Metadata; import java.time.Duration; import java.util.Random; import java.util.UUID; -import org.apache.rocketmq.proxy.common.ContextVariable; +import org.apache.rocketmq.proxy.common.context.ContextVariable; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.InitConfigTest; import org.apache.rocketmq.proxy.grpc.interceptor.InterceptorConstants; @@ -76,11 +76,11 @@ public class BaseActivityTest extends InitConfigTest { protected ProxyContext createContext() { return ProxyContext.create() - .withVal(ContextVariable.CLIENT_ID, CLIENT_ID) - .withVal(ContextVariable.LANGUAGE, JAVA) - .withVal(ContextVariable.REMOTE_ADDRESS, REMOTE_ADDR) - .withVal(ContextVariable.LOCAL_ADDRESS, LOCAL_ADDR) - .withVal(ContextVariable.REMAINING_MS, Duration.ofSeconds(10).toMillis()); + .withValue(ContextVariable.CLIENT_ID, CLIENT_ID) + .withValue(ContextVariable.LANGUAGE, JAVA) + .withValue(ContextVariable.REMOTE_ADDRESS, REMOTE_ADDR) + .withValue(ContextVariable.LOCAL_ADDRESS, LOCAL_ADDR) + .withValue(ContextVariable.REMAINING_MS, Duration.ofSeconds(10).toMillis()); } protected static String buildReceiptHandle(String topic, long popTime, long invisibleTime) { diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java index 1bdbdd9bef..af5e3e10dc 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java @@ -58,7 +58,7 @@ public class GrpcClientChannelTest extends InitConfigTest { super.before(); this.clientId = RandomStringUtils.randomAlphabetic(10); this.grpcClientChannel = new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, grpcChannelManager, - ProxyContext.create().setRemoteAddress("10.152.39.53:9768").setLocalAddress("11.193.0.1:1210"), + ProxyContext.create().withRemoteAddress("10.152.39.53:9768").withLocalAddress("11.193.0.1:1210"), this.clientId); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java index 6742f094c8..3c3f5bf28f 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java @@ -25,7 +25,7 @@ import apache.rocketmq.v2.RetryPolicy; import apache.rocketmq.v2.Settings; import apache.rocketmq.v2.Subscription; import com.google.protobuf.util.Durations; -import org.apache.rocketmq.proxy.common.ContextVariable; +import org.apache.rocketmq.proxy.common.context.ContextVariable; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest; import org.apache.rocketmq.remoting.protocol.subscription.CustomizedRetryPolicy; @@ -52,7 +52,7 @@ public class GrpcClientSettingsManagerTest extends BaseActivityTest { @Test public void testGetProducerData() { - ProxyContext context = ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID); + ProxyContext context = ProxyContext.create().withValue(ContextVariable.CLIENT_ID, CLIENT_ID); this.grpcClientSettingsManager.updateClientSettings(context, CLIENT_ID, Settings.newBuilder() .setBackoffPolicy(RetryPolicy.getDefaultInstance()) @@ -65,7 +65,7 @@ public class GrpcClientSettingsManagerTest extends BaseActivityTest { @Test public void testGetSubscriptionData() { - ProxyContext context = ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID); + ProxyContext context = ProxyContext.create().withValue(ContextVariable.CLIENT_ID, CLIENT_ID); SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); when(this.messagingProcessor.getSubscriptionGroupConfig(any(), any())) diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java index 77ae5e4d11..70460a9419 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java @@ -94,9 +94,8 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); ProxyContext context = createContext(); - context.setRemainingMs(1L); this.receiveMessageActivity.receiveMessage( - context, + context.withRemainingMs(1L), ReceiveMessageRequest.newBuilder() .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build()) @@ -121,9 +120,9 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType()); - final ProxyContext context = createContext(); - context.setClientVersion("5.0.2"); - context.setRemainingMs(-1L); + final ProxyContext context = createContext() + .withClientVersion("5.0.2") + .withRemainingMs(-1L); final ReceiveMessageRequest request = ReceiveMessageRequest.newBuilder() .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build()) @@ -144,9 +143,8 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor1 = ArgumentCaptor.forClass(ReceiveMessageResponse.class); doNothing().when(receiveStreamObserver).onNext(responseArgumentCaptor1.capture()); - context.setClientVersion("5.0.3"); this.receiveMessageActivity.receiveMessage( - context, + context.withClientVersion("5.0.3"), request, receiveStreamObserver ); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java index 84fc6499c0..51fea167d0 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java @@ -46,7 +46,7 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.proxy.common.ContextVariable; +import org.apache.rocketmq.proxy.common.context.ContextVariable; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; @@ -123,8 +123,8 @@ public class LocalMessageServiceTest extends InitConfigTest { Mockito.when(brokerControllerMock.getEndTransactionProcessor()).thenReturn(endTransactionProcessorMock); Mockito.when(brokerControllerMock.getBrokerConfig()).thenReturn(new BrokerConfig()); localMessageService = new LocalMessageService(brokerControllerMock, channelManager, null); - proxyContext = ProxyContext.create().withVal(ContextVariable.REMOTE_ADDRESS, "0.0.0.1") - .withVal(ContextVariable.LOCAL_ADDRESS, "0.0.0.2"); + proxyContext = ProxyContext.create().withValue(ContextVariable.REMOTE_ADDRESS, "0.0.0.1") + .withValue(ContextVariable.LOCAL_ADDRESS, "0.0.0.2"); } @Test diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java index a6d807937e..7ebad93722 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java @@ -79,7 +79,7 @@ public class ProxyClientRemotingProcessorTest { proxyRelayResultFuture)); GrpcClientChannel grpcClientChannel = new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, null, - ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"), "clientId"); + ProxyContext.create().withRemoteAddress("127.0.0.1:8888").withLocalAddress("127.0.0.1:10911"), "clientId"); when(producerManager.getAvailableChannel(anyString())) .thenReturn(grpcClientChannel); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java index 25ae1509a9..86a529178f 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java @@ -35,7 +35,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.state.StateEventListener; import org.apache.rocketmq.proxy.common.RenewEvent; -import org.apache.rocketmq.proxy.common.ContextVariable; +import org.apache.rocketmq.proxy.common.context.ContextVariable; import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; @@ -71,7 +71,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Mock protected ConsumerManager consumerManager; - private static final ProxyContext PROXY_CONTEXT = ProxyContext.create(); + private static ProxyContext proxyContext = ProxyContext.create(); private static final String GROUP = "group"; private static final String TOPIC = "topic"; private static final String BROKER_NAME = "broker"; @@ -92,7 +92,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { public void fireEvent(RenewEvent event) { MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); - messagingProcessor.changeInvisibleTime(PROXY_CONTEXT, handle, messageReceiptHandle.getMessageId(), + messagingProcessor.changeInvisibleTime(proxyContext, handle, messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), event.getRenewTime()) .whenComplete((v, t) -> { if (t != null) { @@ -115,8 +115,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { .offset(OFFSET) .commitLogOffset(0L) .build().encode(); - PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id"); - PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new LocalChannel()); + proxyContext = proxyContext.withValue(ContextVariable.CLIENT_ID, "channel-id"); + proxyContext = proxyContext.withValue(ContextVariable.CHANNEL, new LocalChannel()); Mockito.doNothing().when(consumerManager).appendConsumerIdsChangeListener(Mockito.any(ConsumerIdsChangeListener.class)); messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); @@ -125,7 +125,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testAddReceiptHandle() { Channel channel = new LocalChannel(); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); receiptHandleManager.scheduleRenewTask(); @@ -137,7 +137,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testAddDuplicationMessage() { ProxyConfig config = ConfigurationManager.getProxyConfig(); - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); { String receiptHandle = ReceiptHandle.builder() .startOffset(0L) @@ -152,9 +152,9 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { .build().encode(); MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); } - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); receiptHandleManager.scheduleRenewTask(); @@ -169,8 +169,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testRenewReceiptHandle() { ProxyConfig config = ConfigurationManager.getProxyConfig(); - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); @@ -214,9 +214,9 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testRenewExceedMaxRenewTimes() { - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new MQClientException(0, "error")); @@ -244,9 +244,9 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testRenewWithInvalidHandle() { - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error")); @@ -268,9 +268,9 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testRenewWithErrorThenOK() { ProxyConfig config = ConfigurationManager.getProxyConfig(); - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); AtomicInteger count = new AtomicInteger(0); List<CompletableFuture<AckResult>> futureList = new ArrayList<>(); @@ -347,8 +347,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { .build().encode(); messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); @@ -381,8 +381,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { .build().encode(); messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(null); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong())) @@ -417,8 +417,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { .build().encode(); messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); @@ -430,9 +430,9 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testRemoveReceiptHandle() { - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); - receiptHandleManager.removeReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle); + Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.removeReceiptHandle(proxyContext, channel, GROUP, MSG_ID, receiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); receiptHandleManager.scheduleRenewTask(); @@ -443,8 +443,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testClearGroup() { - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); receiptHandleManager.clearGroup(new ReceiptHandleGroupKey(channel, GROUP)); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); @@ -458,8 +458,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { public void testClientOffline() { ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class); Mockito.verify(consumerManager, Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture()); - Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0)); assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty()); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java index 9a2c5e3437..7e4df145df 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java @@ -146,7 +146,7 @@ public class HeartbeatSyncerTest extends InitConfigTest { GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class); GrpcClientChannel grpcClientChannel = new GrpcClientChannel( proxyRelayService, grpcClientSettingsManager, grpcChannelManager, - ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress), + ProxyContext.create().withRemoteAddress(remoteAddress).withLocalAddress(localAddress), clientId); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( grpcClientChannel, @@ -345,7 +345,7 @@ public class HeartbeatSyncerTest extends InitConfigTest { GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class); GrpcClientChannel grpcClientChannel = new GrpcClientChannel( proxyRelayService, grpcClientSettingsManager, grpcChannelManager, - ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress), + ProxyContext.create().withRemoteAddress(remoteAddress).withLocalAddress(localAddress), clientId); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( grpcClientChannel,