This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push: new 89896c5 [ISSUE #622] Add config enableSsl (#623) 89896c5 is described below commit 89896c5926b6bd007cbdd48773dc0b5eca333422 Author: panzhi <panzh...@qq.com> AuthorDate: Thu Feb 8 10:51:41 2024 +0800 [ISSUE #622] Add config enableSsl (#623) --- .../autoconfigure/ExtConsumerResetConfiguration.java | 3 ++- .../autoconfigure/ExtTemplateResetConfiguration.java | 3 ++- .../client/autoconfigure/RocketMQAutoConfiguration.java | 8 +++++--- .../rocketmq/client/support/DefaultListenerContainer.java | 13 ++++++++++++- .../org/apache/rocketmq/client/support/RocketMQUtil.java | 14 +++++++++----- 5 files changed, 30 insertions(+), 11 deletions(-) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java index 3545dae..8a758b5 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java @@ -117,8 +117,9 @@ public class ExtConsumerResetConfiguration implements ApplicationContextAware, S String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType()); Duration requestTimeout = Duration.ofDays(annotation.requestTimeout()); int awaitDuration = annotation.awaitDuration(); + Boolean sslEnabled = simpleConsumer.isSslEnabled(); Assert.hasText(topicName, "[topic] must not be null"); - ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout); + ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled); final ClientServiceProvider provider = ClientServiceProvider.loadService(); FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tag, filterExpressionType); Duration duration = Duration.ofSeconds(awaitDuration); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java index 75bd449..bd301c7 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java @@ -111,7 +111,8 @@ public class ExtTemplateResetConfiguration implements ApplicationContextAware, S String secretKey = environment.resolvePlaceholders(annotation.secretKey()); secretKey = StringUtils.hasLength(secretKey) ? secretKey : producerConfig.getSecretKey(); int requestTimeout = annotation.requestTimeout(); - ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endpoints, Duration.ofDays(requestTimeout)); + Boolean sslEnabled = producerConfig.isSslEnabled(); + ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endpoints, Duration.ofDays(requestTimeout), sslEnabled); final ClientServiceProvider provider = ClientServiceProvider.loadService(); ProducerBuilder producerBuilder = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration).setMaxAttempts(annotation.maxAttempts()) diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java index 11763c8..e9bd8be 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java @@ -83,10 +83,12 @@ public class RocketMQAutoConfiguration implements ApplicationContextAware { ProducerBuilder producerBuilder; producerBuilder = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) - // Set the topic name(s), which is optional but recommended. It makes producer could prefetch the topic - // route before message publishing. - .setTopics(rocketMQProducer.getTopic()) .setMaxAttempts(rocketMQProducer.getMaxAttempts()); + if (StringUtils.hasLength(topic)) { + // Set the topic name(s), which is optional but recommended. It makes producer could prefetch the topic + // route before message publishing. + producerBuilder.setTopics(rocketMQProducer.getTopic()); + } log.info(String.format("a producer init on proxy %s", endPoints)); return producerBuilder; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java index c2466c0..45cdf7e 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java @@ -85,6 +85,8 @@ public class DefaultListenerContainer implements InitializingBean, int consumptionThreadCount = 20; + Boolean sslEnabled; + public String getName() { return name; } @@ -230,6 +232,14 @@ public class DefaultListenerContainer implements InitializingBean, this.type = type; } + public Boolean getSslEnabled() { + return sslEnabled; + } + + public void setSslEnabled(Boolean sslEnabled) { + this.sslEnabled = sslEnabled; + } + private void initRocketMQPushConsumer() { if (rocketMQMessageListener == null) { throw new IllegalArgumentException("Property 'rocketMQMessageListener' is required"); @@ -242,7 +252,8 @@ public class DefaultListenerContainer implements InitializingBean, if (StringUtils.hasLength(this.getTag())) { filterExpression = RocketMQUtil.createFilterExpression(this.getTag(),this.getType()); } - ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(), this.getEndpoints(), this.getRequestTimeout()); + ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(this.getAccessKey(), this.getSecretKey(), + this.getEndpoints(), this.getRequestTimeout(), this.sslEnabled); PushConsumerBuilder pushConsumerBuilder = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java index 10d977a..fab2d46 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java @@ -121,8 +121,8 @@ public class RocketMQUtil { String secretKey = rocketMQProducer.getSecretKey(); String endPoints = rocketMQProducer.getEndpoints(); Duration requestTimeout = Duration.ofDays(rocketMQProducer.getRequestTimeout()); - // boolean sslEnabled = rocketMQProducer.isSslEnabled(); - return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout); + boolean sslEnabled = rocketMQProducer.isSslEnabled(); + return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled); } public static ClientConfiguration createConsumerClientConfiguration(RocketMQProperties.SimpleConsumer simpleConsumer) { @@ -130,12 +130,13 @@ public class RocketMQUtil { String secretKey = simpleConsumer.getSecretKey(); String endPoints = simpleConsumer.getEndpoints(); Duration requestTimeout = Duration.ofDays(simpleConsumer.getRequestTimeout()); - // boolean sslEnabled = rocketMQProducer.isSslEnabled(); - return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout); + boolean sslEnabled = simpleConsumer.isSslEnabled(); + return createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled); } - public static ClientConfiguration createClientConfiguration(String accessKey, String secretKey, String endPoints, Duration requestTimeout) { + public static ClientConfiguration createClientConfiguration(String accessKey, String secretKey, String endPoints, + Duration requestTimeout, Boolean sslEnabled) { SessionCredentialsProvider sessionCredentialsProvider = null; if (StringUtils.hasLength(accessKey) && StringUtils.hasLength(secretKey)) { @@ -150,6 +151,9 @@ public class RocketMQUtil { if (Objects.nonNull(requestTimeout)) { clientConfigurationBuilder.setRequestTimeout(requestTimeout); } + if (Objects.nonNull(sslEnabled)) { + clientConfigurationBuilder.enableSsl(sslEnabled); + } return clientConfigurationBuilder.build(); }