This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 6c3781f17e [ISSUE #8365] add non-oneway updateConsumerOffset (#8368) 6c3781f17e is described below commit 6c3781f17e22ec35ddb2113bab5cdc4967cb8260 Author: 吴星灿 <37405937+qianye1...@users.noreply.github.com> AuthorDate: Fri Jul 12 13:47:58 2024 +0800 [ISSUE #8365] add non-oneway updateConsumerOffset (#8368) --- .../client/impl/mqclient/MQClientAPIExt.java | 27 ++++++++++++++++ .../client/impl/mqclient/MQClientAPIExtTest.java | 37 ++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java index b97e00c577..0e2092b8a0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java @@ -400,6 +400,33 @@ public class MQClientAPIExt extends MQClientAPIImpl { return future; } + public CompletableFuture<Void> updateConsumerOffsetAsync( + String brokerAddr, + UpdateConsumerOffsetRequestHeader header, + long timeoutMillis + ) { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, header); + CompletableFuture<Void> future = new CompletableFuture<>(); + invoke(brokerAddr, request, timeoutMillis).whenComplete((response, t) -> { + if (t != null) { + log.error("updateConsumerOffsetAsync failed, brokerAddr={}, requestHeader={}", brokerAddr, header, t); + future.completeExceptionally(t); + return; + } + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + future.complete(null); + } + case ResponseCode.SYSTEM_ERROR: + case ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST: + case ResponseCode.TOPIC_NOT_EXIST: { + future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark())); + } + } + }); + return future; + } + public CompletableFuture<List<String>> getConsumerListByGroupAsync( String brokerAddr, GetConsumerListByGroupRequestHeader requestHeader, diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java index 752bc98eab..6f692dff95 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java @@ -18,14 +18,19 @@ package org.apache.rocketmq.client.impl.mqclient; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.utils.FutureUtils; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -34,9 +39,12 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; @RunWith(MockitoJUnitRunner.class) public class MQClientAPIExtTest { @@ -71,4 +79,33 @@ public class MQClientAPIExtTest { CompletableFuture<SendResult> future = mqClientAPIExt.sendMessageAsync("127.0.0.1:10911", "test", msg, requestHeader, 10); assertThatThrownBy(future::get).getCause().isInstanceOf(RemotingTimeoutException.class); } + + @Test + public void testUpdateConsumerOffsetAsync_Success() throws ExecutionException, InterruptedException { + CompletableFuture<RemotingCommand> remotingFuture = new CompletableFuture<>(); + remotingFuture.complete(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "")); + doReturn(remotingFuture).when(remotingClientMock).invoke(anyString(), any(RemotingCommand.class), anyLong()); + + CompletableFuture<Void> future = mqClientAPIExt.updateConsumerOffsetAsync("brokerAddr", new UpdateConsumerOffsetRequestHeader(), 3000L); + + assertNull("Future should be completed without exception", future.get()); + } + + @Test + public void testUpdateConsumerOffsetAsync_Fail() throws InterruptedException { + + CompletableFuture<RemotingCommand> remotingFuture = new CompletableFuture<>(); + remotingFuture.complete(RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, "QueueId is null, topic is testTopic")); + doReturn(remotingFuture).when(remotingClientMock).invoke(anyString(), any(RemotingCommand.class), anyLong()); + + CompletableFuture<Void> future = mqClientAPIExt.updateConsumerOffsetAsync("brokerAddr", new UpdateConsumerOffsetRequestHeader(), 3000L); + + try { + future.get(); + } catch (ExecutionException e) { + MQBrokerException customEx = (MQBrokerException) e.getCause(); + assertEquals(customEx.getResponseCode(), ResponseCode.SYSTEM_ERROR); + assertEquals(customEx.getErrorMessage(), "QueueId is null, topic is testTopic"); + } + } } \ No newline at end of file