This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 702bbd2b83 [ISSUE #7803] Add try catch for lock and unlock (#7804)
702bbd2b83 is described below

commit 702bbd2b831d4fd97a5ad00f9587e79a0a3e5fda
Author: Zhouxiang Zhan <zhouxiang....@alibaba-inc.com>
AuthorDate: Sun Feb 4 16:14:40 2024 +0800

    [ISSUE #7803] Add try catch for lock and unlock (#7804)
    
    * Add try catch for lock and unlock
---
 .../proxy/processor/ConsumerProcessor.java         | 90 +++++++++++++---------
 1 file changed, 53 insertions(+), 37 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index 7870233576..3ff3423701 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -414,49 +414,59 @@ public class ConsumerProcessor extends AbstractProcessor {
     public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, 
Set<MessageQueue> mqSet,
         String consumerGroup, String clientId, long timeoutMillis) {
         CompletableFuture<Set<MessageQueue>> future = new 
CompletableFuture<>();
-        Set<MessageQueue> successSet = new CopyOnWriteArraySet<>();
-        Set<AddressableMessageQueue> addressableMessageQueueSet = 
buildAddressableSet(ctx, mqSet);
-        Map<String, List<AddressableMessageQueue>> messageQueueSetMap = 
buildAddressableMapByBrokerName(addressableMessageQueueSet);
-        List<CompletableFuture<Void>> futureList = new ArrayList<>();
-        messageQueueSetMap.forEach((k, v) -> {
-            LockBatchRequestBody requestBody = new LockBatchRequestBody();
-            requestBody.setConsumerGroup(consumerGroup);
-            requestBody.setClientId(clientId);
-            
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
-            CompletableFuture<Void> future0 = 
serviceManager.getMessageService()
-                .lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis)
-                .thenAccept(successSet::addAll);
-            futureList.add(FutureUtils.addExecutor(future0, this.executor));
-        });
-        CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).whenComplete((v, t) -> {
-            if (t != null) {
-                log.error("LockBatchMQ failed", t);
-            }
-            future.complete(successSet);
-        });
+        try {
+            Set<MessageQueue> successSet = new CopyOnWriteArraySet<>();
+            Set<AddressableMessageQueue> addressableMessageQueueSet = 
buildAddressableSet(ctx, mqSet);
+            Map<String, List<AddressableMessageQueue>> messageQueueSetMap = 
buildAddressableMapByBrokerName(addressableMessageQueueSet);
+            List<CompletableFuture<Void>> futureList = new ArrayList<>();
+            messageQueueSetMap.forEach((k, v) -> {
+                LockBatchRequestBody requestBody = new LockBatchRequestBody();
+                requestBody.setConsumerGroup(consumerGroup);
+                requestBody.setClientId(clientId);
+                
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
+                CompletableFuture<Void> future0 = 
serviceManager.getMessageService()
+                    .lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis)
+                    .thenAccept(successSet::addAll);
+                futureList.add(FutureUtils.addExecutor(future0, 
this.executor));
+            });
+            CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).whenComplete((v, t) -> {
+                if (t != null) {
+                    log.error("LockBatchMQ failed, group={}", consumerGroup, 
t);
+                }
+                future.complete(successSet);
+            });
+        } catch (Throwable t) {
+            log.error("LockBatchMQ exception, group={}", consumerGroup, t);
+            future.completeExceptionally(t);
+        }
         return FutureUtils.addExecutor(future, this.executor);
     }
 
     public CompletableFuture<Void> unlockBatchMQ(ProxyContext ctx, 
Set<MessageQueue> mqSet,
         String consumerGroup, String clientId, long timeoutMillis) {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Set<AddressableMessageQueue> addressableMessageQueueSet = 
buildAddressableSet(ctx, mqSet);
-        Map<String, List<AddressableMessageQueue>> messageQueueSetMap = 
buildAddressableMapByBrokerName(addressableMessageQueueSet);
-        List<CompletableFuture<Void>> futureList = new ArrayList<>();
-        messageQueueSetMap.forEach((k, v) -> {
-            UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
-            requestBody.setConsumerGroup(consumerGroup);
-            requestBody.setClientId(clientId);
-            
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
-            CompletableFuture<Void> future0 = 
serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody, 
timeoutMillis);
-            futureList.add(FutureUtils.addExecutor(future0, this.executor));
-        });
-        CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).whenComplete((v, t) -> {
-            if (t != null) {
-                log.error("UnlockBatchMQ failed", t);
-            }
-            future.complete(null);
-        });
+        try {
+            Set<AddressableMessageQueue> addressableMessageQueueSet = 
buildAddressableSet(ctx, mqSet);
+            Map<String, List<AddressableMessageQueue>> messageQueueSetMap = 
buildAddressableMapByBrokerName(addressableMessageQueueSet);
+            List<CompletableFuture<Void>> futureList = new ArrayList<>();
+            messageQueueSetMap.forEach((k, v) -> {
+                UnlockBatchRequestBody requestBody = new 
UnlockBatchRequestBody();
+                requestBody.setConsumerGroup(consumerGroup);
+                requestBody.setClientId(clientId);
+                
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
+                CompletableFuture<Void> future0 = 
serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody, 
timeoutMillis);
+                futureList.add(FutureUtils.addExecutor(future0, 
this.executor));
+            });
+            CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).whenComplete((v, t) -> {
+                if (t != null) {
+                    log.error("UnlockBatchMQ failed, group={}", consumerGroup, 
t);
+                }
+                future.complete(null);
+            });
+        } catch (Throwable t) {
+            log.error("UnlockBatchMQ exception, group={}", consumerGroup, t);
+            future.completeExceptionally(t);
+        }
         return FutureUtils.addExecutor(future, this.executor);
     }
 
@@ -505,7 +515,13 @@ public class ConsumerProcessor extends AbstractProcessor {
     protected HashMap<String, List<AddressableMessageQueue>> 
buildAddressableMapByBrokerName(
         final Set<AddressableMessageQueue> mqSet) {
         HashMap<String, List<AddressableMessageQueue>> result = new 
HashMap<>();
+        if (mqSet == null) {
+            return result;
+        }
         for (AddressableMessageQueue mq : mqSet) {
+            if (mq == null) {
+                continue;
+            }
             List<AddressableMessageQueue> mqs = 
result.computeIfAbsent(mq.getBrokerName(), k -> new ArrayList<>());
             mqs.add(mq);
         }

Reply via email to