This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new d2b818d993 Revert "[ISSUE #7707] Refector Context with link node implementation (#7708)" (#7742) d2b818d993 is described below commit d2b818d99366ad3ef8ba83d77780b7d15c318d13 Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com> AuthorDate: Fri Jan 12 10:24:07 2024 +0800 Revert "[ISSUE #7707] Refector Context with link node implementation (#7708)" (#7742) This reverts commit 3f99b1e96bedb0dc6854c92b2f753cdf9fa68197. --- .../common/{context => }/ContextVariable.java | 2 +- .../apache/rocketmq/proxy/common/ProxyContext.java | 97 +++++++++++----------- .../rocketmq/proxy/common/context/ContextNode.java | 55 ------------ .../proxy/grpc/v2/GrpcMessagingApplication.java | 16 ++-- .../proxy/processor/ReceiptHandleProcessor.java | 2 +- .../activity/AbstractRemotingActivity.java | 36 ++++---- .../rocketmq/proxy/service/relay/ProxyChannel.java | 4 +- .../org/apache/rocketmq/proxy/ContextNodeTest.java | 69 --------------- .../rocketmq/proxy/common/ProxyContextTest.java | 48 ----------- .../rocketmq/proxy/grpc/v2/BaseActivityTest.java | 12 +-- .../grpc/v2/channel/GrpcClientChannelTest.java | 2 +- .../v2/common/GrpcClientSettingsManagerTest.java | 6 +- .../v2/consumer/ReceiveMessageActivityTest.java | 12 +-- .../service/message/LocalMessageServiceTest.java | 6 +- .../mqclient/ProxyClientRemotingProcessorTest.java | 2 +- .../receipt/DefaultReceiptHandleManagerTest.java | 60 ++++++------- .../service/sysmessage/HeartbeatSyncerTest.java | 4 +- 17 files changed, 129 insertions(+), 304 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java similarity index 96% rename from proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java rename to proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java index 727f4c9bbc..0760826de7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.proxy.common.context; +package org.apache.rocketmq.proxy.common; public class ContextVariable { public static final String REMOTE_ADDRESS = "remote-address"; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java index 3e602d5ad0..77a6791f04 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java @@ -18,118 +18,117 @@ package org.apache.rocketmq.proxy.common; import io.netty.channel.Channel; -import org.apache.rocketmq.proxy.common.context.ContextNode; -import org.apache.rocketmq.proxy.common.context.ContextVariable; +import java.util.HashMap; +import java.util.Map; public class ProxyContext { public static final String INNER_ACTION_PREFIX = "Inner"; - private final ContextNode contextNode; - - ProxyContext() { - this.contextNode = new ContextNode(); - } - - ProxyContext(ContextNode parent) { - this.contextNode = parent; - } - - ProxyContext(ProxyContext that) { - this.contextNode = that.contextNode; - } + private final Map<String, Object> value = new HashMap<>(); public static ProxyContext create() { return new ProxyContext(); } public static ProxyContext createForInner(String actionName) { - return create().withAction(INNER_ACTION_PREFIX + actionName); + return create().setAction(INNER_ACTION_PREFIX + actionName); } public static ProxyContext createForInner(Class<?> clazz) { return createForInner(clazz.getSimpleName()); } - public ProxyContext withValue(String key, Object val) { - return new ProxyContext(contextNode.withValue(key, val)); + public Map<String, Object> getValue() { + return this.value; } - public <T> T getValue(String key) { - return (T) contextNode.getValue(key); + public ProxyContext withVal(String key, Object val) { + this.value.put(key, val); + return this; } - public <T> T getValue(String key, Class<T> classType) { - return (T) contextNode.getValue(key, classType); + public <T> T getVal(String key) { + return (T) this.value.get(key); } - public ProxyContext withLocalAddress(String localAddress) { - return this.withValue(ContextVariable.LOCAL_ADDRESS, localAddress); + public ProxyContext setLocalAddress(String localAddress) { + this.withVal(ContextVariable.LOCAL_ADDRESS, localAddress); + return this; } public String getLocalAddress() { - return contextNode.getValue(ContextVariable.LOCAL_ADDRESS, String.class); + return this.getVal(ContextVariable.LOCAL_ADDRESS); } - public ProxyContext withRemoteAddress(String remoteAddress) { - return this.withValue(ContextVariable.REMOTE_ADDRESS, remoteAddress); + public ProxyContext setRemoteAddress(String remoteAddress) { + this.withVal(ContextVariable.REMOTE_ADDRESS, remoteAddress); + return this; } public String getRemoteAddress() { - return contextNode.getValue(ContextVariable.REMOTE_ADDRESS, String.class); + return this.getVal(ContextVariable.REMOTE_ADDRESS); } - public ProxyContext withClientID(String clientID) { - return this.withValue(ContextVariable.CLIENT_ID, clientID); + public ProxyContext setClientID(String clientID) { + this.withVal(ContextVariable.CLIENT_ID, clientID); + return this; } public String getClientID() { - return contextNode.getValue(ContextVariable.CLIENT_ID, String.class); + return this.getVal(ContextVariable.CLIENT_ID); } - public ProxyContext withChannel(Channel channel) { - return this.withValue(ContextVariable.CHANNEL, channel); + public ProxyContext setChannel(Channel channel) { + this.withVal(ContextVariable.CHANNEL, channel); + return this; } public Channel getChannel() { - return contextNode.getValue(ContextVariable.CHANNEL, Channel.class); + return this.getVal(ContextVariable.CHANNEL); } - public ProxyContext withLanguage(String language) { - return this.withValue(ContextVariable.LANGUAGE, language); + public ProxyContext setLanguage(String language) { + this.withVal(ContextVariable.LANGUAGE, language); + return this; } public String getLanguage() { - return contextNode.getValue(ContextVariable.LANGUAGE, String.class); + return this.getVal(ContextVariable.LANGUAGE); } - public ProxyContext withClientVersion(String clientVersion) { - return this.withValue(ContextVariable.CLIENT_VERSION, clientVersion); + public ProxyContext setClientVersion(String clientVersion) { + this.withVal(ContextVariable.CLIENT_VERSION, clientVersion); + return this; } public String getClientVersion() { - return contextNode.getValue(ContextVariable.CLIENT_VERSION, String.class); + return this.getVal(ContextVariable.CLIENT_VERSION); } - public ProxyContext withRemainingMs(Long remainingMs) { - return this.withValue(ContextVariable.REMAINING_MS, remainingMs); + public ProxyContext setRemainingMs(Long remainingMs) { + this.withVal(ContextVariable.REMAINING_MS, remainingMs); + return this; } public Long getRemainingMs() { - return contextNode.getValue(ContextVariable.REMAINING_MS, Long.class); + return this.getVal(ContextVariable.REMAINING_MS); } - public ProxyContext withAction(String action) { - return this.withValue(ContextVariable.ACTION, action); + public ProxyContext setAction(String action) { + this.withVal(ContextVariable.ACTION, action); + return this; } public String getAction() { - return contextNode.getValue(ContextVariable.ACTION, String.class); + return this.getVal(ContextVariable.ACTION); } - public ProxyContext withProtocolType(String protocol) { - return this.withValue(ContextVariable.PROTOCOL_TYPE, protocol); + public ProxyContext setProtocolType(String protocol) { + this.withVal(ContextVariable.PROTOCOL_TYPE, protocol); + return this; } public String getProtocolType() { - return contextNode.getValue(ContextVariable.PROTOCOL_TYPE, String.class); + return this.getVal(ContextVariable.PROTOCOL_TYPE); } + } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java deleted file mode 100644 index 7b418516b0..0000000000 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.proxy.common.context; - -public class ContextNode { - private final String key; - private final Object value; - private final ContextNode parent; - - public ContextNode() { - this(null, null, null); - } - - public ContextNode(ContextNode parent, String key, Object value) { - this.parent = parent; - this.key = key; - this.value = value; - } - - public ContextNode withValue(String key, Object value) { - return new ContextNode(this, key, value); - } - - public Object getValue(String key) { - for (ContextNode current = this; current != null; current = current.parent) { - if (key.equals(current.key)) { - return current.value; - } - } - return null; - } - - public <T> T getValue(String key, Class<T> classType) { - Object value = getValue(key); - if (classType.isInstance(value)) { - return classType.cast(value); - } - return null; - } -} \ No newline at end of file diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java index 3cd664ec55..2cb395ad60 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java @@ -169,15 +169,15 @@ public class GrpcMessagingApplication extends MessagingServiceGrpc.MessagingServ Context ctx = Context.current(); Metadata headers = InterceptorConstants.METADATA.get(ctx); ProxyContext context = ProxyContext.create() - .withLocalAddress(getDefaultStringMetadataInfo(headers, InterceptorConstants.LOCAL_ADDRESS)) - .withRemoteAddress(getDefaultStringMetadataInfo(headers, InterceptorConstants.REMOTE_ADDRESS)) - .withClientID(getDefaultStringMetadataInfo(headers, InterceptorConstants.CLIENT_ID)) - .withProtocolType(ChannelProtocolType.GRPC_V2.getName()) - .withLanguage(getDefaultStringMetadataInfo(headers, InterceptorConstants.LANGUAGE)) - .withClientVersion(getDefaultStringMetadataInfo(headers, InterceptorConstants.CLIENT_VERSION)) - .withAction(getDefaultStringMetadataInfo(headers, InterceptorConstants.SIMPLE_RPC_NAME)); + .setLocalAddress(getDefaultStringMetadataInfo(headers, InterceptorConstants.LOCAL_ADDRESS)) + .setRemoteAddress(getDefaultStringMetadataInfo(headers, InterceptorConstants.REMOTE_ADDRESS)) + .setClientID(getDefaultStringMetadataInfo(headers, InterceptorConstants.CLIENT_ID)) + .setProtocolType(ChannelProtocolType.GRPC_V2.getName()) + .setLanguage(getDefaultStringMetadataInfo(headers, InterceptorConstants.LANGUAGE)) + .setClientVersion(getDefaultStringMetadataInfo(headers, InterceptorConstants.CLIENT_VERSION)) + .setAction(getDefaultStringMetadataInfo(headers, InterceptorConstants.SIMPLE_RPC_NAME)); if (ctx.getDeadline() != null) { - context = context.withRemainingMs(ctx.getDeadline().timeRemaining(TimeUnit.MILLISECONDS)); + context.setRemainingMs(ctx.getDeadline().timeRemaining(TimeUnit.MILLISECONDS)); } return context; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java index 71ebfe8af1..5e1be93218 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java @@ -37,7 +37,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor { super(messagingProcessor, serviceManager); StateEventListener<RenewEvent> eventListener = event -> { ProxyContext context = createContext(event.getEventType().name()) - .withChannel(event.getKey().getChannel()); + .setChannel(event.getKey().getChannel()); MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(), diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java index 73779eaaf0..ce4a633976 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java @@ -19,8 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import java.util.HashMap; -import java.util.Map; import org.apache.rocketmq.acl.common.AclException; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -42,11 +40,14 @@ import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.netty.AttributeKeys; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; -import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + public abstract class AbstractRemotingActivity implements NettyRequestProcessor { protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); protected final MessagingProcessor messagingProcessor; @@ -123,23 +124,18 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor protected ProxyContext createContext(ChannelHandlerContext ctx, RemotingCommand request) { ProxyContext context = ProxyContext.create(); Channel channel = ctx.channel(); - LanguageCode languageCode = RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY, channel); - String clientId = RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY, channel); - Integer version = RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, channel); - context = context.withAction(RemotingHelper.getRequestCodeDesc(request.getCode())) - .withProtocolType(ChannelProtocolType.REMOTING.getName()) - .withChannel(channel) - .withLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress())) - .withRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); - if (languageCode != null) { - context = context.withLanguage(languageCode.name()); - } - if (clientId != null) { - context = context.withClientID(clientId); - } - if (version != null) { - context = context.withClientVersion(MQVersion.getVersionDesc(version)); - } + context.setAction(RemotingHelper.getRequestCodeDesc(request.getCode())) + .setProtocolType(ChannelProtocolType.REMOTING.getName()) + .setChannel(channel) + .setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress())) + .setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY, channel)) + .ifPresent(language -> context.setLanguage(language.name())); + Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY, channel)) + .ifPresent(context::setClientID); + Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, channel)) + .ifPresent(version -> context.setClientVersion(MQVersion.getVersionDesc(version))); return context; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java index 277b8f1586..5a1185a81e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java @@ -77,8 +77,8 @@ public abstract class ProxyChannel extends SimpleChannel { try { if (msg instanceof RemotingCommand) { ProxyContext context = ProxyContext.createForInner(this.getClass()) - .withRemoteAddress(remoteAddress) - .withLocalAddress(localAddress); + .setRemoteAddress(remoteAddress) + .setLocalAddress(localAddress); RemotingCommand command = (RemotingCommand) msg; if (command.getExtFields() == null) { command.setExtFields(new HashMap<>()); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java deleted file mode 100644 index 19cf179c3d..0000000000 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.proxy; - -import org.apache.rocketmq.proxy.common.context.ContextNode; -import org.junit.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -public class ContextNodeTest { - private ContextNode contextNode; - - @Test - public void testWithValue() { - String key = "key"; - String value = "value"; - contextNode = new ContextNode(); - ContextNode newContextNode = contextNode.withValue(key, value); - assertThat(newContextNode.getValue(key, String.class)).isEqualTo(value); - assertThat(newContextNode.getValue(key)).isEqualTo(value); - - assertThat(contextNode.getValue(key, String.class)).isNull(); - } - - @Test - public void testRepeatedKeyForTwoContext() { - String key1 = "key1"; - String value1 = "value1"; - String value2 = "value2"; - contextNode = new ContextNode(); - ContextNode newContextNode1 = contextNode.withValue(key1, value1); - ContextNode newContextNode2 = contextNode.withValue(key1, value2); - assertThat(newContextNode1.getValue(key1, String.class)).isEqualTo(value1); - assertThat(newContextNode1.getValue(key1)).isEqualTo(value1); - assertThat(newContextNode2.getValue(key1, String.class)).isEqualTo(value2); - assertThat(newContextNode2.getValue(key1)).isEqualTo(value2); - - assertThat(contextNode.getValue(key1, String.class)).isNull(); - } - - @Test - public void testRepeatedKeyForContextChain() { - String key1 = "key1"; - String value1 = "value1"; - String value2 = "value2"; - contextNode = new ContextNode(); - ContextNode newContextNode1 = contextNode.withValue(key1, value1); - ContextNode newContextNode2 = newContextNode1.withValue(key1, value2); - assertThat(newContextNode1.getValue(key1, String.class)).isEqualTo(value1); - assertThat(newContextNode2.getValue(key1, String.class)).isEqualTo(value2); - - assertThat(contextNode.getValue(key1, String.class)).isNull(); - } -} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java deleted file mode 100644 index 0999440cde..0000000000 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.proxy.common; - -import org.junit.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -public class ProxyContextTest { - private ProxyContext proxyContext; - - @Test - public void testWithValue() { - String key = "key"; - String value = "value"; - proxyContext = ProxyContext.create(); - ProxyContext newContext = proxyContext.withValue(key, value); - assertThat(newContext.getValue(key, String.class)).isEqualTo(value); - String actualValue = newContext.getValue(key); - assertThat(actualValue).isEqualTo(value); - - assertThat(proxyContext.getValue(key, String.class)).isNull(); - } - - @Test - public void testSetLocalAddress() { - String address = "address"; - proxyContext = ProxyContext.create(); - ProxyContext newProxyContext = proxyContext.withLocalAddress(address); - assertThat(proxyContext.getLocalAddress()).isNull(); - assertThat(newProxyContext.getLocalAddress()).isEqualTo(address); - } -} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java index f29d59fe4c..524945bd6f 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java @@ -21,7 +21,7 @@ import io.grpc.Metadata; import java.time.Duration; import java.util.Random; import java.util.UUID; -import org.apache.rocketmq.proxy.common.context.ContextVariable; +import org.apache.rocketmq.proxy.common.ContextVariable; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.InitConfigTest; import org.apache.rocketmq.proxy.grpc.interceptor.InterceptorConstants; @@ -76,11 +76,11 @@ public class BaseActivityTest extends InitConfigTest { protected ProxyContext createContext() { return ProxyContext.create() - .withValue(ContextVariable.CLIENT_ID, CLIENT_ID) - .withValue(ContextVariable.LANGUAGE, JAVA) - .withValue(ContextVariable.REMOTE_ADDRESS, REMOTE_ADDR) - .withValue(ContextVariable.LOCAL_ADDRESS, LOCAL_ADDR) - .withValue(ContextVariable.REMAINING_MS, Duration.ofSeconds(10).toMillis()); + .withVal(ContextVariable.CLIENT_ID, CLIENT_ID) + .withVal(ContextVariable.LANGUAGE, JAVA) + .withVal(ContextVariable.REMOTE_ADDRESS, REMOTE_ADDR) + .withVal(ContextVariable.LOCAL_ADDRESS, LOCAL_ADDR) + .withVal(ContextVariable.REMAINING_MS, Duration.ofSeconds(10).toMillis()); } protected static String buildReceiptHandle(String topic, long popTime, long invisibleTime) { diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java index af5e3e10dc..1bdbdd9bef 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java @@ -58,7 +58,7 @@ public class GrpcClientChannelTest extends InitConfigTest { super.before(); this.clientId = RandomStringUtils.randomAlphabetic(10); this.grpcClientChannel = new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, grpcChannelManager, - ProxyContext.create().withRemoteAddress("10.152.39.53:9768").withLocalAddress("11.193.0.1:1210"), + ProxyContext.create().setRemoteAddress("10.152.39.53:9768").setLocalAddress("11.193.0.1:1210"), this.clientId); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java index 3c3f5bf28f..6742f094c8 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java @@ -25,7 +25,7 @@ import apache.rocketmq.v2.RetryPolicy; import apache.rocketmq.v2.Settings; import apache.rocketmq.v2.Subscription; import com.google.protobuf.util.Durations; -import org.apache.rocketmq.proxy.common.context.ContextVariable; +import org.apache.rocketmq.proxy.common.ContextVariable; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest; import org.apache.rocketmq.remoting.protocol.subscription.CustomizedRetryPolicy; @@ -52,7 +52,7 @@ public class GrpcClientSettingsManagerTest extends BaseActivityTest { @Test public void testGetProducerData() { - ProxyContext context = ProxyContext.create().withValue(ContextVariable.CLIENT_ID, CLIENT_ID); + ProxyContext context = ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID); this.grpcClientSettingsManager.updateClientSettings(context, CLIENT_ID, Settings.newBuilder() .setBackoffPolicy(RetryPolicy.getDefaultInstance()) @@ -65,7 +65,7 @@ public class GrpcClientSettingsManagerTest extends BaseActivityTest { @Test public void testGetSubscriptionData() { - ProxyContext context = ProxyContext.create().withValue(ContextVariable.CLIENT_ID, CLIENT_ID); + ProxyContext context = ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID); SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); when(this.messagingProcessor.getSubscriptionGroupConfig(any(), any())) diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java index 70460a9419..77ae5e4d11 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java @@ -94,8 +94,9 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); ProxyContext context = createContext(); + context.setRemainingMs(1L); this.receiveMessageActivity.receiveMessage( - context.withRemainingMs(1L), + context, ReceiveMessageRequest.newBuilder() .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build()) @@ -120,9 +121,9 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType()); - final ProxyContext context = createContext() - .withClientVersion("5.0.2") - .withRemainingMs(-1L); + final ProxyContext context = createContext(); + context.setClientVersion("5.0.2"); + context.setRemainingMs(-1L); final ReceiveMessageRequest request = ReceiveMessageRequest.newBuilder() .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build()) @@ -143,8 +144,9 @@ public class ReceiveMessageActivityTest extends BaseActivityTest { ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor1 = ArgumentCaptor.forClass(ReceiveMessageResponse.class); doNothing().when(receiveStreamObserver).onNext(responseArgumentCaptor1.capture()); + context.setClientVersion("5.0.3"); this.receiveMessageActivity.receiveMessage( - context.withClientVersion("5.0.3"), + context, request, receiveStreamObserver ); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java index e959244dec..3e3d37086b 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java @@ -46,7 +46,7 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.proxy.common.context.ContextVariable; +import org.apache.rocketmq.proxy.common.ContextVariable; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; @@ -123,8 +123,8 @@ public class LocalMessageServiceTest extends InitConfigTest { Mockito.when(brokerControllerMock.getEndTransactionProcessor()).thenReturn(endTransactionProcessorMock); Mockito.when(brokerControllerMock.getBrokerConfig()).thenReturn(new BrokerConfig()); localMessageService = new LocalMessageService(brokerControllerMock, channelManager, null); - proxyContext = ProxyContext.create().withValue(ContextVariable.REMOTE_ADDRESS, "0.0.0.1") - .withValue(ContextVariable.LOCAL_ADDRESS, "0.0.0.2"); + proxyContext = ProxyContext.create().withVal(ContextVariable.REMOTE_ADDRESS, "0.0.0.1") + .withVal(ContextVariable.LOCAL_ADDRESS, "0.0.0.2"); } @Test diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java index 7ebad93722..a6d807937e 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java @@ -79,7 +79,7 @@ public class ProxyClientRemotingProcessorTest { proxyRelayResultFuture)); GrpcClientChannel grpcClientChannel = new GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, null, - ProxyContext.create().withRemoteAddress("127.0.0.1:8888").withLocalAddress("127.0.0.1:10911"), "clientId"); + ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"), "clientId"); when(producerManager.getAvailableChannel(anyString())) .thenReturn(grpcClientChannel); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java index 86a529178f..25ae1509a9 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java @@ -35,7 +35,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.state.StateEventListener; import org.apache.rocketmq.proxy.common.RenewEvent; -import org.apache.rocketmq.proxy.common.context.ContextVariable; +import org.apache.rocketmq.proxy.common.ContextVariable; import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; @@ -71,7 +71,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Mock protected ConsumerManager consumerManager; - private static ProxyContext proxyContext = ProxyContext.create(); + private static final ProxyContext PROXY_CONTEXT = ProxyContext.create(); private static final String GROUP = "group"; private static final String TOPIC = "topic"; private static final String BROKER_NAME = "broker"; @@ -92,7 +92,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { public void fireEvent(RenewEvent event) { MessageReceiptHandle messageReceiptHandle = event.getMessageReceiptHandle(); ReceiptHandle handle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()); - messagingProcessor.changeInvisibleTime(proxyContext, handle, messageReceiptHandle.getMessageId(), + messagingProcessor.changeInvisibleTime(PROXY_CONTEXT, handle, messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), event.getRenewTime()) .whenComplete((v, t) -> { if (t != null) { @@ -115,8 +115,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { .offset(OFFSET) .commitLogOffset(0L) .build().encode(); - proxyContext = proxyContext.withValue(ContextVariable.CLIENT_ID, "channel-id"); - proxyContext = proxyContext.withValue(ContextVariable.CHANNEL, new LocalChannel()); + PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id"); + PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new LocalChannel()); Mockito.doNothing().when(consumerManager).appendConsumerIdsChangeListener(Mockito.any(ConsumerIdsChangeListener.class)); messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); @@ -125,7 +125,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testAddReceiptHandle() { Channel channel = new LocalChannel(); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); receiptHandleManager.scheduleRenewTask(); @@ -137,7 +137,7 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testAddDuplicationMessage() { ProxyConfig config = ConfigurationManager.getProxyConfig(); - Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); { String receiptHandle = ReceiptHandle.builder() .startOffset(0L) @@ -152,9 +152,9 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { .build().encode(); MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); } - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); receiptHandleManager.scheduleRenewTask(); @@ -169,8 +169,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testRenewReceiptHandle() { ProxyConfig config = ConfigurationManager.getProxyConfig(); - Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); @@ -214,9 +214,9 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testRenewExceedMaxRenewTimes() { - Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new MQClientException(0, "error")); @@ -244,9 +244,9 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testRenewWithInvalidHandle() { - Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); CompletableFuture<AckResult> ackResultFuture = new CompletableFuture<>(); ackResultFuture.completeExceptionally(new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error")); @@ -268,9 +268,9 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testRenewWithErrorThenOK() { ProxyConfig config = ConfigurationManager.getProxyConfig(); - Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); AtomicInteger count = new AtomicInteger(0); List<CompletableFuture<AckResult>> futureList = new ArrayList<>(); @@ -347,8 +347,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { .build().encode(); messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); - Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); @@ -381,8 +381,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { .build().encode(); messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); - Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(null); Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyLong())) @@ -417,8 +417,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { .build().encode(); messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET, RECONSUME_TIMES); - Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); @@ -430,9 +430,9 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testRemoveReceiptHandle() { - Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); - receiptHandleManager.removeReceiptHandle(proxyContext, channel, GROUP, MSG_ID, receiptHandle); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); + receiptHandleManager.removeReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, receiptHandle); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); receiptHandleManager.scheduleRenewTask(); @@ -443,8 +443,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { @Test public void testClearGroup() { - Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); receiptHandleManager.clearGroup(new ReceiptHandleGroupKey(channel, GROUP)); SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(groupConfig); @@ -458,8 +458,8 @@ public class DefaultReceiptHandleManagerTest extends BaseServiceTest { public void testClientOffline() { ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = ArgumentCaptor.forClass(ConsumerIdsChangeListener.class); Mockito.verify(consumerManager, Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture()); - Channel channel = proxyContext.getValue(ContextVariable.CHANNEL); - receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP, MSG_ID, messageReceiptHandle); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, messageReceiptHandle); listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0)); assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty()); } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java index 7e4df145df..9a2c5e3437 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java @@ -146,7 +146,7 @@ public class HeartbeatSyncerTest extends InitConfigTest { GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class); GrpcClientChannel grpcClientChannel = new GrpcClientChannel( proxyRelayService, grpcClientSettingsManager, grpcChannelManager, - ProxyContext.create().withRemoteAddress(remoteAddress).withLocalAddress(localAddress), + ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress), clientId); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( grpcClientChannel, @@ -345,7 +345,7 @@ public class HeartbeatSyncerTest extends InitConfigTest { GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class); GrpcClientChannel grpcClientChannel = new GrpcClientChannel( proxyRelayService, grpcClientSettingsManager, grpcChannelManager, - ProxyContext.create().withRemoteAddress(remoteAddress).withLocalAddress(localAddress), + ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress), clientId); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( grpcClientChannel,