This is an automated email from the ASF dual-hosted git repository. yuzhou pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 5600684eb4 [ISSUE #8725] clean DefaultMQPushConsumer after start fail (#8726) 5600684eb4 is described below commit 5600684eb437dd5a4aeb9c658e24200bcb74909b Author: yuz10 <845238...@qq.com> AuthorDate: Wed Oct 30 20:08:28 2024 +0800 [ISSUE #8725] clean DefaultMQPushConsumer after start fail (#8726) * [ISSUE #8725]clean DefaultMQPushConsumer after start fail * clean DefaultLitePullConsumerImpl after start fail --- .../client/impl/consumer/DefaultLitePullConsumerImpl.java | 7 ++++++- .../client/impl/consumer/DefaultMQPushConsumerImpl.java | 14 ++++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 3f90b67ec9..f5ff3179bf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -307,7 +307,12 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup()); - operateAfterRunning(); + try { + operateAfterRunning(); + } catch (Exception e) { + shutdown(); + throw e; + } break; case RUNNING: diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index c92cadf505..4eccba8e8d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -1006,10 +1006,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { break; } - this.updateTopicSubscribeInfoWhenSubscriptionChanged(); - this.mQClientFactory.checkClientInBroker(); - if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) { - this.mQClientFactory.rebalanceImmediately(); + try { + this.updateTopicSubscribeInfoWhenSubscriptionChanged(); + this.mQClientFactory.checkClientInBroker(); + if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) { + this.mQClientFactory.rebalanceImmediately(); + } + } catch (Exception e) { + log.warn("Start the consumer {} fail.", this.defaultMQPushConsumer.getConsumerGroup(), e); + shutdown(); + throw e; } }