This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1588d65762 [ISSUE #8365] add remoting client non-oneway 
updateConsumerOffset function (#8391)
1588d65762 is described below

commit 1588d6576295963787a99ab8d8754adf458bf329
Author: 吴星灿 <37405937+qianye1...@users.noreply.github.com>
AuthorDate: Wed Jul 17 15:41:45 2024 +0800

    [ISSUE #8365] add remoting client non-oneway updateConsumerOffset function 
(#8391)
    
    * add non-oneway updateConsumerOffset
---
 .../proxy/processor/ConsumerProcessor.java         | 29 ++++++++++++++++++----
 .../proxy/processor/DefaultMessagingProcessor.java | 11 ++++++--
 .../proxy/processor/MessagingProcessor.java        | 16 +++++++++---
 .../service/message/ClusterMessageService.java     | 15 +++++++++--
 .../proxy/service/message/LocalMessageService.java |  6 +++++
 .../proxy/service/message/MessageService.java      |  7 ++++++
 6 files changed, 72 insertions(+), 12 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index 24fc0a2a28..ace8af1b99 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -40,12 +40,12 @@ import 
org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.FutureUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-import org.apache.rocketmq.common.utils.FutureUtils;
 import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
 import org.apache.rocketmq.proxy.service.ServiceManager;
 import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
@@ -96,7 +96,7 @@ public class ConsumerProcessor extends AbstractProcessor {
             }
             return popMessage(ctx, messageQueue, consumerGroup, topic, 
maxMsgNums, invisibleTime, pollTime, initMode,
                 subscriptionData, fifo, popMessageResultFilter, attemptId, 
timeoutMillis);
-        }  catch (Throwable t) {
+        } catch (Throwable t) {
             future.completeExceptionally(t);
         }
         return future;
@@ -287,7 +287,8 @@ public class ConsumerProcessor extends AbstractProcessor {
         return FutureUtils.addExecutor(future, this.executor);
     }
 
-    protected CompletableFuture<List<BatchAckResult>> 
processBrokerHandle(ProxyContext ctx, String consumerGroup, String topic, 
List<ReceiptHandleMessage> handleMessageList, long timeoutMillis) {
+    protected CompletableFuture<List<BatchAckResult>> 
processBrokerHandle(ProxyContext ctx, String consumerGroup,
+        String topic, List<ReceiptHandleMessage> handleMessageList, long 
timeoutMillis) {
         return this.serviceManager.getMessageService().batchAckMessage(ctx, 
handleMessageList, consumerGroup, topic, timeoutMillis)
             .thenApply(result -> {
                 List<BatchAckResult> results = new ArrayList<>();
@@ -393,6 +394,24 @@ public class ConsumerProcessor extends AbstractProcessor {
         return FutureUtils.addExecutor(future, this.executor);
     }
 
+    public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, 
MessageQueue messageQueue,
+        String consumerGroup, long commitOffset, long timeoutMillis) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            AddressableMessageQueue addressableMessageQueue = 
serviceManager.getTopicRouteService()
+                .buildAddressableMessageQueue(ctx, messageQueue);
+            UpdateConsumerOffsetRequestHeader requestHeader = new 
UpdateConsumerOffsetRequestHeader();
+            requestHeader.setConsumerGroup(consumerGroup);
+            requestHeader.setTopic(addressableMessageQueue.getTopic());
+            requestHeader.setQueueId(addressableMessageQueue.getQueueId());
+            requestHeader.setCommitOffset(commitOffset);
+            future = 
serviceManager.getMessageService().updateConsumerOffsetAsync(ctx, 
addressableMessageQueue, requestHeader, timeoutMillis);
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return FutureUtils.addExecutor(future, this.executor);
+    }
+
     public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx, 
MessageQueue messageQueue,
         String consumerGroup, long timeoutMillis) {
         CompletableFuture<Long> future = new CompletableFuture<>();
@@ -501,9 +520,9 @@ public class ConsumerProcessor extends AbstractProcessor {
 
     protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext 
ctx, Set<MessageQueue> mqSet) {
         Set<AddressableMessageQueue> addressableMessageQueueSet = new 
HashSet<>(mqSet.size());
-        for (MessageQueue mq:mqSet) {
+        for (MessageQueue mq : mqSet) {
             try {
-                
addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx,
 mq)) ;
+                
addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx,
 mq));
             } catch (Exception e) {
                 log.error("build addressable message queue fail, messageQueue 
= {}", mq, e);
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 48a732c284..9c494d7a45 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -75,7 +75,7 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
     protected ThreadPoolExecutor producerProcessorExecutor;
     protected ThreadPoolExecutor consumerProcessorExecutor;
     protected static final String ROCKETMQ_HOME = 
System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
-            System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+        System.getenv(MixAll.ROCKETMQ_HOME_ENV));
 
     protected DefaultMessagingProcessor(ServiceManager serviceManager) {
         ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
@@ -167,7 +167,8 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
     }
 
     @Override
-    public CompletableFuture<Void> endTransaction(ProxyContext ctx, String 
topic, String transactionId, String messageId, String producerGroup,
+    public CompletableFuture<Void> endTransaction(ProxyContext ctx, String 
topic, String transactionId,
+        String messageId, String producerGroup,
         TransactionStatus transactionStatus, boolean fromTransactionCheck,
         long timeoutMillis) {
         return this.transactionProcessor.endTransaction(ctx, topic, 
transactionId, messageId, producerGroup, transactionStatus, 
fromTransactionCheck, timeoutMillis);
@@ -225,6 +226,12 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
         return this.consumerProcessor.updateConsumerOffset(ctx, messageQueue, 
consumerGroup, commitOffset, timeoutMillis);
     }
 
+    @Override
+    public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, 
MessageQueue messageQueue,
+        String consumerGroup, long commitOffset, long timeoutMillis) {
+        return this.consumerProcessor.updateConsumerOffsetAsync(ctx, 
messageQueue, consumerGroup, commitOffset, timeoutMillis);
+    }
+
     @Override
     public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx, 
MessageQueue messageQueue,
         String consumerGroup, long timeoutMillis) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index 213d2beeea..03d28262d7 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -33,10 +33,10 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.proxy.common.Address;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
 import org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
@@ -217,6 +217,14 @@ public interface MessagingProcessor extends 
StartAndShutdown {
         long timeoutMillis
     );
 
+    CompletableFuture<Void> updateConsumerOffsetAsync(
+        ProxyContext ctx,
+        MessageQueue messageQueue,
+        String consumerGroup,
+        long commitOffset,
+        long timeoutMillis
+    );
+
     CompletableFuture<Long> queryConsumerOffset(
         ProxyContext ctx,
         MessageQueue messageQueue,
@@ -321,7 +329,9 @@ public interface MessagingProcessor extends 
StartAndShutdown {
 
     MetadataService getMetadataService();
 
-    void addReceiptHandle(ProxyContext ctx, Channel channel, String group, 
String msgID, MessageReceiptHandle messageReceiptHandle);
+    void addReceiptHandle(ProxyContext ctx, Channel channel, String group, 
String msgID,
+        MessageReceiptHandle messageReceiptHandle);
 
-    MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel 
channel, String group, String msgID, String receiptHandle);
+    MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel 
channel, String group, String msgID,
+        String receiptHandle);
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
index ba7d5ad8e2..f9eb94fcfc 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
@@ -29,10 +29,10 @@ import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.FutureUtils;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-import org.apache.rocketmq.common.utils.FutureUtils;
 import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
 import org.apache.rocketmq.proxy.service.route.TopicRouteService;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -139,7 +139,8 @@ public class ClusterMessageService implements 
MessageService {
     }
 
     @Override
-    public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, 
List<ReceiptHandleMessage> handleList, String consumerGroup,
+    public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, 
List<ReceiptHandleMessage> handleList,
+        String consumerGroup,
         String topic, long timeoutMillis) {
         List<String> extraInfoList = handleList.stream().map(message -> 
message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList());
         return this.mqClientAPIFactory.getClient().batchAckMessageAsync(
@@ -181,6 +182,16 @@ public class ClusterMessageService implements 
MessageService {
         );
     }
 
+    @Override
+    public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, 
AddressableMessageQueue messageQueue,
+        UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) {
+        return this.mqClientAPIFactory.getClient().updateConsumerOffsetAsync(
+            messageQueue.getBrokerAddr(),
+            requestHeader,
+            timeoutMillis
+        );
+    }
+
     @Override
     public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, 
AddressableMessageQueue messageQueue,
         LockBatchRequestBody requestBody, long timeoutMillis) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index aaa688fee6..6b2ba02f7c 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -440,6 +440,12 @@ public class LocalMessageService implements MessageService 
{
         throw new NotImplementedException("updateConsumerOffset is not 
implemented in LocalMessageService");
     }
 
+    @Override
+    public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, 
AddressableMessageQueue messageQueue,
+        UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) {
+        throw new NotImplementedException("updateConsumerOffsetAsync is not 
implemented in LocalMessageService");
+    }
+
     @Override
     public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, 
AddressableMessageQueue messageQueue,
         LockBatchRequestBody requestBody, long timeoutMillis) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
index 58a835adb4..61accbc041 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
@@ -120,6 +120,13 @@ public interface MessageService {
         long timeoutMillis
     );
 
+    CompletableFuture<Void> updateConsumerOffsetAsync(
+        ProxyContext ctx,
+        AddressableMessageQueue messageQueue,
+        UpdateConsumerOffsetRequestHeader requestHeader,
+        long timeoutMillis
+    );
+
     CompletableFuture<Set<MessageQueue>> lockBatchMQ(
         ProxyContext ctx,
         AddressableMessageQueue messageQueue,

Reply via email to