This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5600684eb4 [ISSUE #8725] clean DefaultMQPushConsumer after start fail 
(#8726)
5600684eb4 is described below

commit 5600684eb437dd5a4aeb9c658e24200bcb74909b
Author: yuz10 <845238...@qq.com>
AuthorDate: Wed Oct 30 20:08:28 2024 +0800

    [ISSUE #8725] clean DefaultMQPushConsumer after start fail (#8726)
    
    * [ISSUE #8725]clean DefaultMQPushConsumer after start fail
    
    * clean DefaultLitePullConsumerImpl after start fail
---
 .../client/impl/consumer/DefaultLitePullConsumerImpl.java  |  7 ++++++-
 .../client/impl/consumer/DefaultMQPushConsumerImpl.java    | 14 ++++++++++----
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 3f90b67ec9..f5ff3179bf 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -307,7 +307,12 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
                 log.info("the consumer [{}] start OK", 
this.defaultLitePullConsumer.getConsumerGroup());
 
-                operateAfterRunning();
+                try {
+                    operateAfterRunning();
+                } catch (Exception e) {
+                    shutdown();
+                    throw e;
+                }
 
                 break;
             case RUNNING:
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index c92cadf505..4eccba8e8d 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -1006,10 +1006,16 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
                 break;
         }
 
-        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
-        this.mQClientFactory.checkClientInBroker();
-        if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) {
-            this.mQClientFactory.rebalanceImmediately();
+        try {
+            this.updateTopicSubscribeInfoWhenSubscriptionChanged();
+            this.mQClientFactory.checkClientInBroker();
+            if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) {
+                this.mQClientFactory.rebalanceImmediately();
+            }
+        } catch (Exception e) {
+            log.warn("Start the consumer {} fail.", 
this.defaultMQPushConsumer.getConsumerGroup(), e);
+            shutdown();
+            throw e;
         }
     }
 

Reply via email to