This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f1ccde411e [INLONG-11156][SDK] SortSDK support that the token 
configuration of pulsar cluster is null (#11158)
f1ccde411e is described below

commit f1ccde411e4693830c9ac7d7799f1941fbe560b0
Author: ChunLiang Lu <luchunli...@apache.org>
AuthorDate: Fri Sep 20 15:35:57 2024 +0800

    [INLONG-11156][SDK] SortSDK support that the token configuration of pulsar 
cluster is null (#11158)
---
 .../inlong/sdk/sort/manager/InlongMultiTopicManager.java | 12 +++++++++++-
 .../sdk/sort/manager/InlongSingleTopicManager.java       | 16 +++++++++++-----
 .../inlong/sdk/sort/manager/InlongTopicManager.java      | 12 +++++++++++-
 3 files changed, 33 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
index 744b38d223..2d098345ca 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
@@ -35,6 +35,7 @@ import 
org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -213,10 +214,19 @@ public class InlongMultiTopicManager extends TopicManager 
{
                 topic.getInLongCluster().getBootstraps(), consumerSize);
         for (int i = 0; i < consumerSize; i++) {
             try {
+                String token = topic.getInLongCluster().getToken();
+                Authentication auth = null;
+                if (StringUtils.isNoneBlank(token)) {
+                    auth = AuthenticationFactory.token(token);
+                }
                 PulsarClient pulsarClient = PulsarClient.builder()
                         .serviceUrl(topic.getInLongCluster().getBootstraps())
-                        
.authentication(AuthenticationFactory.token(topic.getInLongCluster().getToken()))
+                        .authentication(auth)
                         .build();
+                LOGGER.info("create pulsar client succ cluster:{}, topic:{}, 
token:{}",
+                        topic.getInLongCluster().getClusterId(),
+                        topic.getTopic(),
+                        topic.getInLongCluster().getToken());
                 TopicFetcher fetcher = TopicFetcherBuilder.newPulsarBuilder()
                         .pulsarClient(pulsarClient)
                         .topic(topics)
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
index 503304c77f..d137a903d2 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java
@@ -33,6 +33,7 @@ import 
org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
 import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.slf4j.Logger;
@@ -363,15 +364,20 @@ public class InlongSingleTopicManager extends 
TopicManager {
         if 
(!pulsarClients.containsKey(topic.getInLongCluster().getClusterId())) {
             if (topic.getInLongCluster().getBootstraps() != null) {
                 try {
+                    String token = topic.getInLongCluster().getToken();
+                    Authentication auth = null;
+                    if (StringUtils.isNoneBlank(token)) {
+                        auth = AuthenticationFactory.token(token);
+                    }
                     PulsarClient pulsarClient = PulsarClient.builder()
                             
.serviceUrl(topic.getInLongCluster().getBootstraps())
-                            
.authentication(AuthenticationFactory.token(topic.getInLongCluster().getToken()))
+                            .authentication(auth)
                             .build();
                     pulsarClients.put(topic.getInLongCluster().getClusterId(), 
pulsarClient);
-                    LOGGER.debug("create pulsar client succ {}",
-                            new 
String[]{topic.getInLongCluster().getClusterId(),
-                                    topic.getInLongCluster().getBootstraps(),
-                                    topic.getInLongCluster().getToken()});
+                    LOGGER.info("create pulsar client succ cluster:{}, 
topic:{}, token:{}",
+                            topic.getInLongCluster().getClusterId(),
+                            topic.getTopic(),
+                            topic.getInLongCluster().getToken());
                 } catch (Exception e) {
                     LOGGER.error("create pulsar client error {}", topic);
                     LOGGER.error(e.getMessage(), e);
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
index f594126152..6d3343293b 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java
@@ -32,6 +32,8 @@ import 
org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
 import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.slf4j.Logger;
@@ -359,10 +361,18 @@ public class InlongTopicManager extends TopicManager {
         LOGGER.info("start to init pulsar client for cluster={}", cluster);
         if (cluster.getBootstraps() != null) {
             try {
+                String token = cluster.getToken();
+                Authentication auth = null;
+                if (StringUtils.isNoneBlank(token)) {
+                    auth = AuthenticationFactory.token(token);
+                }
                 PulsarClient pulsarClient = PulsarClient.builder()
                         .serviceUrl(cluster.getBootstraps())
-                        
.authentication(AuthenticationFactory.token(cluster.getToken()))
+                        .authentication(auth)
                         .build();
+                LOGGER.info("create pulsar client succ cluster:{}, token:{}",
+                        cluster.getClusterId(),
+                        cluster.getToken());
                 PulsarClient oldClient = 
pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient);
                 if (oldClient != null && !oldClient.isClosed()) {
                     LOGGER.warn("close new pulsar client for cluster={}", 
cluster);

Reply via email to