This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 37f25ab   [ISSUE #632 ] Fix NPE caused by using @ 
ExtRocketMQTemplateConfiguration annotation extension to send messages in v5
37f25ab is described below

commit 37f25ab6343ab0a4d03f5177fbdcc4e699ba763c
Author: lilinjiang <36615541+lilinji...@users.noreply.github.com>
AuthorDate: Mon Mar 4 14:17:28 2024 +0800

     [ISSUE #632 ] Fix NPE caused by using @ ExtRocketMQTemplateConfiguration 
annotation extension to send messages in v5
    
    Co-authored-by: lilinjiang <ylilinji...@126.com>
---
 .../RocketMQMessageListenerBeanPostProcessor.java  | 33 +++++++++++++++++++++-
 .../ListenerContainerConfiguration.java            | 26 ++++++++++++-----
 2 files changed, 51 insertions(+), 8 deletions(-)

diff --git 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
index 61f3e1d..53d5cab 100644
--- 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
+++ 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListenerBeanPostProcessor.java
@@ -23,6 +23,7 @@ import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.SmartLifecycle;
 import org.springframework.core.OrderComparator;
 import org.springframework.core.annotation.AnnotationUtils;
 
@@ -32,7 +33,7 @@ import java.util.Map;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
-public class RocketMQMessageListenerBeanPostProcessor implements 
ApplicationContextAware, BeanPostProcessor, InitializingBean {
+public class RocketMQMessageListenerBeanPostProcessor implements 
ApplicationContextAware, BeanPostProcessor, InitializingBean, SmartLifecycle {
 
     private ApplicationContext applicationContext;
 
@@ -40,6 +41,8 @@ public class RocketMQMessageListenerBeanPostProcessor 
implements ApplicationCont
 
     private ListenerContainerConfiguration listenerContainerConfiguration;
 
+    private boolean running = false;
+
     @Override
     public Object postProcessBeforeInitialization(Object bean, String 
beanName) throws BeansException {
         return bean;
@@ -58,6 +61,34 @@ public class RocketMQMessageListenerBeanPostProcessor 
implements ApplicationCont
         return bean;
     }
 
+    @Override
+    public int getPhase() {
+        return Integer.MAX_VALUE - 2000;
+    }
+
+    @Override
+    public void start() {
+        if (!isRunning()) {
+            this.setRunning(true);
+            listenerContainerConfiguration.startContainer();
+        }
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    public void setRunning(boolean running) {
+        this.running = running;
+    }
+
+
+    @Override
+    public boolean isRunning() {
+        return running;
+    }
+
     @Override
     public void setApplicationContext(ApplicationContext applicationContext) 
throws BeansException {
         this.applicationContext = applicationContext;
diff --git 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
index 81c5b09..bfbb7f9 100644
--- 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
+++ 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java
@@ -32,6 +32,8 @@ import org.springframework.core.env.ConfigurableEnvironment;
 import org.springframework.util.Assert;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 @Configuration
@@ -48,6 +50,8 @@ public class ListenerContainerConfiguration implements 
ApplicationContextAware {
 
     private RocketMQMessageConverter rocketMQMessageConverter;
 
+    private final List<DefaultListenerContainer> containers = new 
ArrayList<>();
+
     public ListenerContainerConfiguration(RocketMQMessageConverter 
rocketMQMessageConverter,
                                           ConfigurableEnvironment environment, 
RocketMQProperties rocketMQProperties) {
         this.rocketMQMessageConverter = rocketMQMessageConverter;
@@ -68,15 +72,23 @@ public class ListenerContainerConfiguration implements 
ApplicationContextAware {
         genericApplicationContext.registerBean(containerBeanName, 
DefaultListenerContainer.class, () -> 
createRocketMQListenerContainer(containerBeanName, bean, annotation));
         DefaultListenerContainer container = 
genericApplicationContext.getBean(containerBeanName,
                 DefaultListenerContainer.class);
-        if (!container.isRunning()) {
-            try {
-                container.start();
-            } catch (Exception e) {
-                log.error("Started container failed. {}", container, e);
-                throw new RuntimeException(e);
+
+        containers.add(container);
+
+        log.info("Register the listener to container, listenerBeanName:{}, 
containerBeanName:{}", beanName, containerBeanName);
+    }
+
+    public void startContainer() {
+        for (DefaultListenerContainer container : containers) {
+            if (!container.isRunning()) {
+                try {
+                    container.start();
+                } catch (Exception e) {
+                    log.error("Started container failed. {}", container, e);
+                    throw new RuntimeException(e);
+                }
             }
         }
-        log.info("Register the listener to container, listenerBeanName:{}, 
containerBeanName:{}", beanName, containerBeanName);
     }
 
     private DefaultListenerContainer createRocketMQListenerContainer(String 
name, Object bean, RocketMQMessageListener annotation) {

Reply via email to