This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 1588d65762 [ISSUE #8365] add remoting client non-oneway updateConsumerOffset function (#8391) 1588d65762 is described below commit 1588d6576295963787a99ab8d8754adf458bf329 Author: 吴星灿 <37405937+qianye1...@users.noreply.github.com> AuthorDate: Wed Jul 17 15:41:45 2024 +0800 [ISSUE #8365] add remoting client non-oneway updateConsumerOffset function (#8391) * add non-oneway updateConsumerOffset --- .../proxy/processor/ConsumerProcessor.java | 29 ++++++++++++++++++---- .../proxy/processor/DefaultMessagingProcessor.java | 11 ++++++-- .../proxy/processor/MessagingProcessor.java | 16 +++++++++--- .../service/message/ClusterMessageService.java | 15 +++++++++-- .../proxy/service/message/LocalMessageService.java | 6 +++++ .../proxy/service/message/MessageService.java | 7 ++++++ 6 files changed, 72 insertions(+), 12 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java index 24fc0a2a28..ace8af1b99 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java @@ -40,12 +40,12 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.FutureUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; -import org.apache.rocketmq.common.utils.FutureUtils; import org.apache.rocketmq.proxy.common.utils.ProxyUtils; import org.apache.rocketmq.proxy.service.ServiceManager; import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; @@ -96,7 +96,7 @@ public class ConsumerProcessor extends AbstractProcessor { } return popMessage(ctx, messageQueue, consumerGroup, topic, maxMsgNums, invisibleTime, pollTime, initMode, subscriptionData, fifo, popMessageResultFilter, attemptId, timeoutMillis); - } catch (Throwable t) { + } catch (Throwable t) { future.completeExceptionally(t); } return future; @@ -287,7 +287,8 @@ public class ConsumerProcessor extends AbstractProcessor { return FutureUtils.addExecutor(future, this.executor); } - protected CompletableFuture<List<BatchAckResult>> processBrokerHandle(ProxyContext ctx, String consumerGroup, String topic, List<ReceiptHandleMessage> handleMessageList, long timeoutMillis) { + protected CompletableFuture<List<BatchAckResult>> processBrokerHandle(ProxyContext ctx, String consumerGroup, + String topic, List<ReceiptHandleMessage> handleMessageList, long timeoutMillis) { return this.serviceManager.getMessageService().batchAckMessage(ctx, handleMessageList, consumerGroup, topic, timeoutMillis) .thenApply(result -> { List<BatchAckResult> results = new ArrayList<>(); @@ -393,6 +394,24 @@ public class ConsumerProcessor extends AbstractProcessor { return FutureUtils.addExecutor(future, this.executor); } + public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, MessageQueue messageQueue, + String consumerGroup, long commitOffset, long timeoutMillis) { + CompletableFuture<Void> future = new CompletableFuture<>(); + try { + AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService() + .buildAddressableMessageQueue(ctx, messageQueue); + UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); + requestHeader.setConsumerGroup(consumerGroup); + requestHeader.setTopic(addressableMessageQueue.getTopic()); + requestHeader.setQueueId(addressableMessageQueue.getQueueId()); + requestHeader.setCommitOffset(commitOffset); + future = serviceManager.getMessageService().updateConsumerOffsetAsync(ctx, addressableMessageQueue, requestHeader, timeoutMillis); + } catch (Throwable t) { + future.completeExceptionally(t); + } + return FutureUtils.addExecutor(future, this.executor); + } + public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx, MessageQueue messageQueue, String consumerGroup, long timeoutMillis) { CompletableFuture<Long> future = new CompletableFuture<>(); @@ -501,9 +520,9 @@ public class ConsumerProcessor extends AbstractProcessor { protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext ctx, Set<MessageQueue> mqSet) { Set<AddressableMessageQueue> addressableMessageQueueSet = new HashSet<>(mqSet.size()); - for (MessageQueue mq:mqSet) { + for (MessageQueue mq : mqSet) { try { - addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq)) ; + addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq)); } catch (Exception e) { log.error("build addressable message queue fail, messageQueue = {}", mq, e); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java index 48a732c284..9c494d7a45 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java @@ -75,7 +75,7 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen protected ThreadPoolExecutor producerProcessorExecutor; protected ThreadPoolExecutor consumerProcessorExecutor; protected static final String ROCKETMQ_HOME = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, - System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + System.getenv(MixAll.ROCKETMQ_HOME_ENV)); protected DefaultMessagingProcessor(ServiceManager serviceManager) { ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); @@ -167,7 +167,8 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen } @Override - public CompletableFuture<Void> endTransaction(ProxyContext ctx, String topic, String transactionId, String messageId, String producerGroup, + public CompletableFuture<Void> endTransaction(ProxyContext ctx, String topic, String transactionId, + String messageId, String producerGroup, TransactionStatus transactionStatus, boolean fromTransactionCheck, long timeoutMillis) { return this.transactionProcessor.endTransaction(ctx, topic, transactionId, messageId, producerGroup, transactionStatus, fromTransactionCheck, timeoutMillis); @@ -225,6 +226,12 @@ public class DefaultMessagingProcessor extends AbstractStartAndShutdown implemen return this.consumerProcessor.updateConsumerOffset(ctx, messageQueue, consumerGroup, commitOffset, timeoutMillis); } + @Override + public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, MessageQueue messageQueue, + String consumerGroup, long commitOffset, long timeoutMillis) { + return this.consumerProcessor.updateConsumerOffsetAsync(ctx, messageQueue, consumerGroup, commitOffset, timeoutMillis); + } + @Override public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx, MessageQueue messageQueue, String consumerGroup, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java index 213d2beeea..03d28262d7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java @@ -33,10 +33,10 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.proxy.common.Address; import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; -import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage; import org.apache.rocketmq.proxy.service.metadata.MetadataService; import org.apache.rocketmq.proxy.service.relay.ProxyRelayService; @@ -217,6 +217,14 @@ public interface MessagingProcessor extends StartAndShutdown { long timeoutMillis ); + CompletableFuture<Void> updateConsumerOffsetAsync( + ProxyContext ctx, + MessageQueue messageQueue, + String consumerGroup, + long commitOffset, + long timeoutMillis + ); + CompletableFuture<Long> queryConsumerOffset( ProxyContext ctx, MessageQueue messageQueue, @@ -321,7 +329,9 @@ public interface MessagingProcessor extends StartAndShutdown { MetadataService getMetadataService(); - void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, MessageReceiptHandle messageReceiptHandle); + void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, + MessageReceiptHandle messageReceiptHandle); - MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle); + MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, + String receiptHandle); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java index ba7d5ad8e2..f9eb94fcfc 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java @@ -29,10 +29,10 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.FutureUtils; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.common.ProxyException; import org.apache.rocketmq.proxy.common.ProxyExceptionCode; -import org.apache.rocketmq.common.utils.FutureUtils; import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue; import org.apache.rocketmq.proxy.service.route.TopicRouteService; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -139,7 +139,8 @@ public class ClusterMessageService implements MessageService { } @Override - public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, List<ReceiptHandleMessage> handleList, String consumerGroup, + public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, List<ReceiptHandleMessage> handleList, + String consumerGroup, String topic, long timeoutMillis) { List<String> extraInfoList = handleList.stream().map(message -> message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList()); return this.mqClientAPIFactory.getClient().batchAckMessageAsync( @@ -181,6 +182,16 @@ public class ClusterMessageService implements MessageService { ); } + @Override + public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, AddressableMessageQueue messageQueue, + UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) { + return this.mqClientAPIFactory.getClient().updateConsumerOffsetAsync( + messageQueue.getBrokerAddr(), + requestHeader, + timeoutMillis + ); + } + @Override public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, AddressableMessageQueue messageQueue, LockBatchRequestBody requestBody, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java index aaa688fee6..6b2ba02f7c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java @@ -440,6 +440,12 @@ public class LocalMessageService implements MessageService { throw new NotImplementedException("updateConsumerOffset is not implemented in LocalMessageService"); } + @Override + public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx, AddressableMessageQueue messageQueue, + UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) { + throw new NotImplementedException("updateConsumerOffsetAsync is not implemented in LocalMessageService"); + } + @Override public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, AddressableMessageQueue messageQueue, LockBatchRequestBody requestBody, long timeoutMillis) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java index 58a835adb4..61accbc041 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java @@ -120,6 +120,13 @@ public interface MessageService { long timeoutMillis ); + CompletableFuture<Void> updateConsumerOffsetAsync( + ProxyContext ctx, + AddressableMessageQueue messageQueue, + UpdateConsumerOffsetRequestHeader requestHeader, + long timeoutMillis + ); + CompletableFuture<Set<MessageQueue>> lockBatchMQ( ProxyContext ctx, AddressableMessageQueue messageQueue,