This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6c3781f17e [ISSUE #8365] add non-oneway updateConsumerOffset (#8368)
6c3781f17e is described below

commit 6c3781f17e22ec35ddb2113bab5cdc4967cb8260
Author: 吴星灿 <37405937+qianye1...@users.noreply.github.com>
AuthorDate: Fri Jul 12 13:47:58 2024 +0800

    [ISSUE #8365] add non-oneway updateConsumerOffset (#8368)
---
 .../client/impl/mqclient/MQClientAPIExt.java       | 27 ++++++++++++++++
 .../client/impl/mqclient/MQClientAPIExtTest.java   | 37 ++++++++++++++++++++++
 2 files changed, 64 insertions(+)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index b97e00c577..0e2092b8a0 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -400,6 +400,33 @@ public class MQClientAPIExt extends MQClientAPIImpl {
         return future;
     }
 
+    public CompletableFuture<Void> updateConsumerOffsetAsync(
+        String brokerAddr,
+        UpdateConsumerOffsetRequestHeader header,
+        long timeoutMillis
+    ) {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, 
header);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        invoke(brokerAddr, request, timeoutMillis).whenComplete((response, t) 
-> {
+            if (t != null) {
+                log.error("updateConsumerOffsetAsync failed, brokerAddr={}, 
requestHeader={}", brokerAddr, header, t);
+                future.completeExceptionally(t);
+                return;
+            }
+            switch (response.getCode()) {
+                case ResponseCode.SUCCESS: {
+                    future.complete(null);
+                }
+                case ResponseCode.SYSTEM_ERROR:
+                case ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST:
+                case ResponseCode.TOPIC_NOT_EXIST: {
+                    future.completeExceptionally(new 
MQBrokerException(response.getCode(), response.getRemark()));
+                }
+            }
+        });
+        return future;
+    }
+
     public CompletableFuture<List<String>> getConsumerListByGroupAsync(
         String brokerAddr,
         GetConsumerListByGroupRequestHeader requestHeader,
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java
index 752bc98eab..6f692dff95 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java
@@ -18,14 +18,19 @@
 package org.apache.rocketmq.client.impl.mqclient;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.utils.FutureUtils;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -34,9 +39,12 @@ import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MQClientAPIExtTest {
@@ -71,4 +79,33 @@ public class MQClientAPIExtTest {
         CompletableFuture<SendResult> future = 
mqClientAPIExt.sendMessageAsync("127.0.0.1:10911", "test", msg, requestHeader, 
10);
         
assertThatThrownBy(future::get).getCause().isInstanceOf(RemotingTimeoutException.class);
     }
+
+    @Test
+    public void testUpdateConsumerOffsetAsync_Success() throws 
ExecutionException, InterruptedException {
+        CompletableFuture<RemotingCommand> remotingFuture = new 
CompletableFuture<>();
+        
remotingFuture.complete(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
 ""));
+        doReturn(remotingFuture).when(remotingClientMock).invoke(anyString(), 
any(RemotingCommand.class), anyLong());
+
+        CompletableFuture<Void> future = 
mqClientAPIExt.updateConsumerOffsetAsync("brokerAddr", new 
UpdateConsumerOffsetRequestHeader(), 3000L);
+
+        assertNull("Future should be completed without exception", 
future.get());
+    }
+
+    @Test
+    public void testUpdateConsumerOffsetAsync_Fail() throws 
InterruptedException {
+
+        CompletableFuture<RemotingCommand> remotingFuture = new 
CompletableFuture<>();
+        
remotingFuture.complete(RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR,
 "QueueId is null, topic is testTopic"));
+        doReturn(remotingFuture).when(remotingClientMock).invoke(anyString(), 
any(RemotingCommand.class), anyLong());
+
+        CompletableFuture<Void> future = 
mqClientAPIExt.updateConsumerOffsetAsync("brokerAddr", new 
UpdateConsumerOffsetRequestHeader(), 3000L);
+
+        try {
+            future.get();
+        } catch (ExecutionException e) {
+            MQBrokerException customEx = (MQBrokerException) e.getCause();
+            assertEquals(customEx.getResponseCode(), 
ResponseCode.SYSTEM_ERROR);
+            assertEquals(customEx.getErrorMessage(), "QueueId is null, topic 
is testTopic");
+        }
+    }
 }
\ No newline at end of file

Reply via email to