This is an automated email from the ASF dual-hosted git repository. dongyuanpan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 2e1ca053d2 [ISSUE #8901] Add more test coverage for RpcClientImpl (#8902) 2e1ca053d2 is described below commit 2e1ca053d2719f2b6ba233496b1b80a48047d403 Author: yx9o <yangx_s...@163.com> AuthorDate: Mon Nov 18 10:19:48 2024 +0800 [ISSUE #8901] Add more test coverage for RpcClientImpl (#8902) --- .../rocketmq/remoting/rpc/RpcClientImplTest.java | 239 +++++++++++++++++++++ 1 file changed, 239 insertions(+) diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/RpcClientImplTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/RpcClientImplTest.java new file mode 100644 index 0000000000..c33511a976 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/rpc/RpcClientImplTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.rpc; + +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.InvokeCallback; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetResponseHeader; +import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class RpcClientImplTest { + + @Mock + private RemotingClient remotingClient; + + @Mock + private ClientMetadata clientMetadata; + + private RpcClientImpl rpcClient; + + private MessageQueue mq; + + @Mock + private RpcRequest request; + + private final long defaultTimeout = 3000L; + + @Before + public void init() throws IllegalAccessException { + rpcClient = new RpcClientImpl(clientMetadata, remotingClient); + + String defaultBroker = "brokerName"; + mq = new MessageQueue("defaultTopic", defaultBroker, 0); + RpcRequestHeader header = mock(RpcRequestHeader.class); + when(request.getHeader()).thenReturn(header); + when(clientMetadata.getBrokerNameFromMessageQueue(mq)).thenReturn(defaultBroker); + when(clientMetadata.findMasterBrokerAddr(any())).thenReturn("127.0.0.1:10911"); + } + + @Test + public void testInvoke_PULL_MESSAGE() throws Exception { + when(request.getCode()).thenReturn(RequestCode.PULL_MESSAGE); + + doAnswer(invocation -> { + InvokeCallback callback = invocation.getArgument(3); + RemotingCommand response = mock(RemotingCommand.class); + when(response.getBody()).thenReturn("success".getBytes()); + PullMessageResponseHeader responseHeader = mock(PullMessageResponseHeader.class); + when(response.decodeCommandCustomHeader(PullMessageResponseHeader.class)).thenReturn(responseHeader); + callback.operationSucceed(response); + return null; + }).when(remotingClient).invokeAsync( + any(), + any(RemotingCommand.class), + anyLong(), + any(InvokeCallback.class)); + + Future<RpcResponse> future = rpcClient.invoke(mq, request, defaultTimeout); + RpcResponse actual = future.get(); + + assertEquals(ResponseCode.SUCCESS, actual.getCode()); + assertEquals("success", new String((byte[]) actual.getBody())); + } + + @Test + public void testInvoke_GET_MIN_OFFSET() throws Exception { + when(request.getCode()).thenReturn(RequestCode.GET_MIN_OFFSET); + + RemotingCommand responseCommand = mock(RemotingCommand.class); + when(responseCommand.getBody()).thenReturn("1".getBytes()); + GetMinOffsetResponseHeader responseHeader = mock(GetMinOffsetResponseHeader.class); + when(responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class)).thenReturn(responseHeader); + when(remotingClient.invokeSync(any(), any(RemotingCommand.class), anyLong())).thenReturn(responseCommand); + + Future<RpcResponse> future = rpcClient.invoke(mq, request, defaultTimeout); + RpcResponse actual = future.get(); + + assertEquals(ResponseCode.SUCCESS, actual.getCode()); + assertEquals("1", new String((byte[]) actual.getBody())); + } + + @Test + public void testInvoke_GET_MAX_OFFSET() throws Exception { + when(request.getCode()).thenReturn(RequestCode.GET_MAX_OFFSET); + + RemotingCommand responseCommand = mock(RemotingCommand.class); + when(responseCommand.getBody()).thenReturn("1000".getBytes()); + GetMaxOffsetResponseHeader responseHeader = mock(GetMaxOffsetResponseHeader.class); + when(responseCommand.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class)).thenReturn(responseHeader); + when(remotingClient.invokeSync(any(), any(RemotingCommand.class), anyLong())).thenReturn(responseCommand); + + Future<RpcResponse> future = rpcClient.invoke(mq, request, defaultTimeout); + RpcResponse actual = future.get(); + + assertEquals(ResponseCode.SUCCESS, actual.getCode()); + assertEquals("1000", new String((byte[]) actual.getBody())); + } + + @Test + public void testInvoke_SEARCH_OFFSET_BY_TIMESTAMP() throws Exception { + when(request.getCode()).thenReturn(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP); + + RemotingCommand responseCommand = mock(RemotingCommand.class); + when(responseCommand.getBody()).thenReturn("1000".getBytes()); + SearchOffsetResponseHeader responseHeader = mock(SearchOffsetResponseHeader.class); + when(responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class)).thenReturn(responseHeader); + when(remotingClient.invokeSync(any(), any(RemotingCommand.class), anyLong())).thenReturn(responseCommand); + + Future<RpcResponse> future = rpcClient.invoke(mq, request, defaultTimeout); + RpcResponse actual = future.get(); + + assertEquals(ResponseCode.SUCCESS, actual.getCode()); + assertEquals("1000", new String((byte[]) actual.getBody())); + } + + @Test + public void testInvoke_GET_EARLIEST_MSG_STORETIME() throws Exception { + when(request.getCode()).thenReturn(RequestCode.GET_EARLIEST_MSG_STORETIME); + + RemotingCommand responseCommand = mock(RemotingCommand.class); + when(responseCommand.getBody()).thenReturn("10000".getBytes()); + GetEarliestMsgStoretimeResponseHeader responseHeader = mock(GetEarliestMsgStoretimeResponseHeader.class); + when(responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class)).thenReturn(responseHeader); + when(remotingClient.invokeSync(any(), any(RemotingCommand.class), anyLong())).thenReturn(responseCommand); + + Future<RpcResponse> future = rpcClient.invoke(mq, request, defaultTimeout); + RpcResponse actual = future.get(); + + assertEquals(ResponseCode.SUCCESS, actual.getCode()); + assertEquals("10000", new String((byte[]) actual.getBody())); + } + + @Test + public void testInvoke_QUERY_CONSUMER_OFFSET() throws Exception { + when(request.getCode()).thenReturn(RequestCode.QUERY_CONSUMER_OFFSET); + + RemotingCommand responseCommand = mock(RemotingCommand.class); + when(responseCommand.getBody()).thenReturn("1000".getBytes()); + QueryConsumerOffsetResponseHeader responseHeader = mock(QueryConsumerOffsetResponseHeader.class); + when(responseCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class)).thenReturn(responseHeader); + when(remotingClient.invokeSync(any(), any(RemotingCommand.class), anyLong())).thenReturn(responseCommand); + + Future<RpcResponse> future = rpcClient.invoke(mq, request, defaultTimeout); + RpcResponse actual = future.get(); + + assertEquals(ResponseCode.SUCCESS, actual.getCode()); + assertEquals("1000", new String((byte[]) actual.getBody())); + } + + @Test + public void testInvoke_UPDATE_CONSUMER_OFFSET() throws Exception { + when(request.getCode()).thenReturn(RequestCode.UPDATE_CONSUMER_OFFSET); + + RemotingCommand responseCommand = mock(RemotingCommand.class); + when(responseCommand.getBody()).thenReturn("success".getBytes()); + UpdateConsumerOffsetResponseHeader responseHeader = mock(UpdateConsumerOffsetResponseHeader.class); + when(responseCommand.decodeCommandCustomHeader(UpdateConsumerOffsetResponseHeader.class)).thenReturn(responseHeader); + when(remotingClient.invokeSync(any(), any(RemotingCommand.class), anyLong())).thenReturn(responseCommand); + + Future<RpcResponse> future = rpcClient.invoke(mq, request, defaultTimeout); + RpcResponse actual = future.get(); + + assertEquals(ResponseCode.SUCCESS, actual.getCode()); + assertEquals("success", new String((byte[]) actual.getBody())); + } + + @Test + public void testInvoke_GET_TOPIC_STATS_INFO() throws Exception { + when(request.getCode()).thenReturn(RequestCode.GET_TOPIC_STATS_INFO); + + RemotingCommand responseCommand = mock(RemotingCommand.class); + TopicStatsTable topicStatsTable = new TopicStatsTable(); + when(responseCommand.getBody()).thenReturn(topicStatsTable.encode()); + when(remotingClient.invokeSync(any(), any(RemotingCommand.class), anyLong())).thenReturn(responseCommand); + + Future<RpcResponse> future = rpcClient.invoke(mq, request, defaultTimeout); + RpcResponse actual = future.get(); + + assertEquals(ResponseCode.SUCCESS, actual.getCode()); + assertTrue(actual.getBody() instanceof TopicStatsTable); + } + + @Test + public void testInvoke_GET_TOPIC_CONFIG() throws Exception { + when(request.getCode()).thenReturn(RequestCode.GET_TOPIC_CONFIG); + + RemotingCommand responseCommand = mock(RemotingCommand.class); + TopicConfigAndQueueMapping topicConfigAndQueueMapping = new TopicConfigAndQueueMapping(); + when(responseCommand.getBody()).thenReturn(RemotingSerializable.encode(topicConfigAndQueueMapping)); + when(remotingClient.invokeSync(any(), any(RemotingCommand.class), anyLong())).thenReturn(responseCommand); + + Future<RpcResponse> future = rpcClient.invoke(mq, request, defaultTimeout); + RpcResponse actual = future.get(); + + assertEquals(ResponseCode.SUCCESS, actual.getCode()); + assertTrue(actual.getBody() instanceof TopicConfigAndQueueMapping); + } +}