This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f1ccde411e [INLONG-11156][SDK] SortSDK support that the token configuration of pulsar cluster is null (#11158) f1ccde411e is described below commit f1ccde411e4693830c9ac7d7799f1941fbe560b0 Author: ChunLiang Lu <luchunli...@apache.org> AuthorDate: Fri Sep 20 15:35:57 2024 +0800 [INLONG-11156][SDK] SortSDK support that the token configuration of pulsar cluster is null (#11158) --- .../inlong/sdk/sort/manager/InlongMultiTopicManager.java | 12 +++++++++++- .../sdk/sort/manager/InlongSingleTopicManager.java | 16 +++++++++++----- .../inlong/sdk/sort/manager/InlongTopicManager.java | 12 +++++++++++- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java index 744b38d223..2d098345ca 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java @@ -35,6 +35,7 @@ import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -213,10 +214,19 @@ public class InlongMultiTopicManager extends TopicManager { topic.getInLongCluster().getBootstraps(), consumerSize); for (int i = 0; i < consumerSize; i++) { try { + String token = topic.getInLongCluster().getToken(); + Authentication auth = null; + if (StringUtils.isNoneBlank(token)) { + auth = AuthenticationFactory.token(token); + } PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl(topic.getInLongCluster().getBootstraps()) - .authentication(AuthenticationFactory.token(topic.getInLongCluster().getToken())) + .authentication(auth) .build(); + LOGGER.info("create pulsar client succ cluster:{}, topic:{}, token:{}", + topic.getInLongCluster().getClusterId(), + topic.getTopic(), + topic.getInLongCluster().getToken()); TopicFetcher fetcher = TopicFetcherBuilder.newPulsarBuilder() .pulsarClient(pulsarClient) .topic(topics) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java index 503304c77f..d137a903d2 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java @@ -33,6 +33,7 @@ import org.apache.inlong.tubemq.client.factory.MessageSessionFactory; import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; import org.slf4j.Logger; @@ -363,15 +364,20 @@ public class InlongSingleTopicManager extends TopicManager { if (!pulsarClients.containsKey(topic.getInLongCluster().getClusterId())) { if (topic.getInLongCluster().getBootstraps() != null) { try { + String token = topic.getInLongCluster().getToken(); + Authentication auth = null; + if (StringUtils.isNoneBlank(token)) { + auth = AuthenticationFactory.token(token); + } PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl(topic.getInLongCluster().getBootstraps()) - .authentication(AuthenticationFactory.token(topic.getInLongCluster().getToken())) + .authentication(auth) .build(); pulsarClients.put(topic.getInLongCluster().getClusterId(), pulsarClient); - LOGGER.debug("create pulsar client succ {}", - new String[]{topic.getInLongCluster().getClusterId(), - topic.getInLongCluster().getBootstraps(), - topic.getInLongCluster().getToken()}); + LOGGER.info("create pulsar client succ cluster:{}, topic:{}, token:{}", + topic.getInLongCluster().getClusterId(), + topic.getTopic(), + topic.getInLongCluster().getToken()); } catch (Exception e) { LOGGER.error("create pulsar client error {}", topic); LOGGER.error(e.getMessage(), e); diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java index f594126152..6d3343293b 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManager.java @@ -32,6 +32,8 @@ import org.apache.inlong.tubemq.client.factory.MessageSessionFactory; import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; import org.slf4j.Logger; @@ -359,10 +361,18 @@ public class InlongTopicManager extends TopicManager { LOGGER.info("start to init pulsar client for cluster={}", cluster); if (cluster.getBootstraps() != null) { try { + String token = cluster.getToken(); + Authentication auth = null; + if (StringUtils.isNoneBlank(token)) { + auth = AuthenticationFactory.token(token); + } PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl(cluster.getBootstraps()) - .authentication(AuthenticationFactory.token(cluster.getToken())) + .authentication(auth) .build(); + LOGGER.info("create pulsar client succ cluster:{}, token:{}", + cluster.getClusterId(), + cluster.getToken()); PulsarClient oldClient = pulsarClients.putIfAbsent(cluster.getClusterId(), pulsarClient); if (oldClient != null && !oldClient.isClosed()) { LOGGER.warn("close new pulsar client for cluster={}", cluster);