This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 702bbd2b83 [ISSUE #7803] Add try catch for lock and unlock (#7804) 702bbd2b83 is described below commit 702bbd2b831d4fd97a5ad00f9587e79a0a3e5fda Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com> AuthorDate: Sun Feb 4 16:14:40 2024 +0800 [ISSUE #7803] Add try catch for lock and unlock (#7804) * Add try catch for lock and unlock --- .../proxy/processor/ConsumerProcessor.java | 90 +++++++++++++--------- 1 file changed, 53 insertions(+), 37 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java index 7870233576..3ff3423701 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java @@ -414,49 +414,59 @@ public class ConsumerProcessor extends AbstractProcessor { public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, Set<MessageQueue> mqSet, String consumerGroup, String clientId, long timeoutMillis) { CompletableFuture<Set<MessageQueue>> future = new CompletableFuture<>(); - Set<MessageQueue> successSet = new CopyOnWriteArraySet<>(); - Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(ctx, mqSet); - Map<String, List<AddressableMessageQueue>> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet); - List<CompletableFuture<Void>> futureList = new ArrayList<>(); - messageQueueSetMap.forEach((k, v) -> { - LockBatchRequestBody requestBody = new LockBatchRequestBody(); - requestBody.setConsumerGroup(consumerGroup); - requestBody.setClientId(clientId); - requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet())); - CompletableFuture<Void> future0 = serviceManager.getMessageService() - .lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis) - .thenAccept(successSet::addAll); - futureList.add(FutureUtils.addExecutor(future0, this.executor)); - }); - CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> { - if (t != null) { - log.error("LockBatchMQ failed", t); - } - future.complete(successSet); - }); + try { + Set<MessageQueue> successSet = new CopyOnWriteArraySet<>(); + Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(ctx, mqSet); + Map<String, List<AddressableMessageQueue>> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet); + List<CompletableFuture<Void>> futureList = new ArrayList<>(); + messageQueueSetMap.forEach((k, v) -> { + LockBatchRequestBody requestBody = new LockBatchRequestBody(); + requestBody.setConsumerGroup(consumerGroup); + requestBody.setClientId(clientId); + requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet())); + CompletableFuture<Void> future0 = serviceManager.getMessageService() + .lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis) + .thenAccept(successSet::addAll); + futureList.add(FutureUtils.addExecutor(future0, this.executor)); + }); + CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> { + if (t != null) { + log.error("LockBatchMQ failed, group={}", consumerGroup, t); + } + future.complete(successSet); + }); + } catch (Throwable t) { + log.error("LockBatchMQ exception, group={}", consumerGroup, t); + future.completeExceptionally(t); + } return FutureUtils.addExecutor(future, this.executor); } public CompletableFuture<Void> unlockBatchMQ(ProxyContext ctx, Set<MessageQueue> mqSet, String consumerGroup, String clientId, long timeoutMillis) { CompletableFuture<Void> future = new CompletableFuture<>(); - Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(ctx, mqSet); - Map<String, List<AddressableMessageQueue>> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet); - List<CompletableFuture<Void>> futureList = new ArrayList<>(); - messageQueueSetMap.forEach((k, v) -> { - UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody(); - requestBody.setConsumerGroup(consumerGroup); - requestBody.setClientId(clientId); - requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet())); - CompletableFuture<Void> future0 = serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis); - futureList.add(FutureUtils.addExecutor(future0, this.executor)); - }); - CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> { - if (t != null) { - log.error("UnlockBatchMQ failed", t); - } - future.complete(null); - }); + try { + Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(ctx, mqSet); + Map<String, List<AddressableMessageQueue>> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet); + List<CompletableFuture<Void>> futureList = new ArrayList<>(); + messageQueueSetMap.forEach((k, v) -> { + UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody(); + requestBody.setConsumerGroup(consumerGroup); + requestBody.setClientId(clientId); + requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet())); + CompletableFuture<Void> future0 = serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis); + futureList.add(FutureUtils.addExecutor(future0, this.executor)); + }); + CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> { + if (t != null) { + log.error("UnlockBatchMQ failed, group={}", consumerGroup, t); + } + future.complete(null); + }); + } catch (Throwable t) { + log.error("UnlockBatchMQ exception, group={}", consumerGroup, t); + future.completeExceptionally(t); + } return FutureUtils.addExecutor(future, this.executor); } @@ -505,7 +515,13 @@ public class ConsumerProcessor extends AbstractProcessor { protected HashMap<String, List<AddressableMessageQueue>> buildAddressableMapByBrokerName( final Set<AddressableMessageQueue> mqSet) { HashMap<String, List<AddressableMessageQueue>> result = new HashMap<>(); + if (mqSet == null) { + return result; + } for (AddressableMessageQueue mq : mqSet) { + if (mq == null) { + continue; + } List<AddressableMessageQueue> mqs = result.computeIfAbsent(mq.getBrokerName(), k -> new ArrayList<>()); mqs.add(mq); }