This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b1de06b94e [INLONG-9972][Sort] Pulsar connector support authentication 
when connecting to Pulsar cluster (#9973)
b1de06b94e is described below

commit b1de06b94e3fe7f18532a04c082a483cb060edda
Author: StingPeng <zpen...@connect.ust.hk>
AuthorDate: Mon Apr 15 09:49:11 2024 +0800

    [INLONG-9972][Sort] Pulsar connector support authentication when connecting 
to Pulsar cluster (#9973)
---
 .../pojo/sort/node/provider/PulsarProvider.java    |  4 ++-
 .../protocol/node/extract/PulsarExtractNode.java   | 35 +++++++++++++++++++---
 .../node/extract/PulsarExtractNodeTest.java        |  4 ++-
 .../inlong/sort/parser/PulsarSqlParserTest.java    |  4 ++-
 inlong-sort/sort-dist/pom.xml                      |  5 ++++
 .../org/apache/inlong/sort/base/Constants.java     | 14 +++++++++
 .../pulsar/table/PulsarDynamicTableFactory.java    |  4 +++
 .../sort/pulsar/table/PulsarTableSource.java       |  5 ++++
 8 files changed, 68 insertions(+), 7 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index a767f799f3..9ef0a1634a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -80,7 +80,9 @@ public class PulsarProvider implements ExtractNodeProvider {
                 startupMode.getValue(),
                 primaryKey,
                 pulsarSource.getSubscription(),
-                scanStartupSubStartOffset);
+                scanStartupSubStartOffset,
+                "",
+                "");
     }
 
     @Override
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
index bc09b52e2c..9a2adcc8e3 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -73,6 +73,22 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric, Meta
     @JsonProperty("scanStartupSubStartOffset")
     private String scanStartupSubStartOffset;
 
+    /**
+     * pulsar client auth plugin class name
+     * e.g. org.apache.pulsar.client.impl.auth.AuthenticationToken
+     */
+    @JsonProperty("clientAuthPluginClassName")
+    private String clientAuthPluginClassName;
+
+    /**
+     * pulsar client auth params
+     * e.g. token:{tokenString}
+     * the tokenString should be compatible with the clientAuthPluginClassName 
see also in:
+     * <a href="https://pulsar.apache.org/docs/next/security-jwt/";> pulsar 
auth </a>
+     */
+    @JsonProperty("clientAuthParams")
+    private String clientAuthParams;
+
     @JsonCreator
     public PulsarExtractNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
@@ -86,7 +102,10 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric, Meta
             @Nonnull @JsonProperty("scanStartupMode") String scanStartupMode,
             @JsonProperty("primaryKey") String primaryKey,
             @JsonProperty("scanStartupSubName") String scanStartupSubName,
-            @JsonProperty("scanStartupSubStartOffset") String 
scanStartupSubStartOffset) {
+            @JsonProperty("scanStartupSubStartOffset") String 
scanStartupSubStartOffset,
+            @JsonProperty("clientAuthPluginClassName") String 
clientAuthPluginClassName,
+            @JsonProperty("clientAuthParams") String clientAuthParams) {
+
         super(id, name, fields, watermarkField, properties);
         this.topic = Preconditions.checkNotNull(topic, "pulsar topic is 
null.");
         this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar 
serviceUrl is null.");
@@ -97,6 +116,9 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric, Meta
         this.primaryKey = primaryKey;
         this.scanStartupSubName = scanStartupSubName;
         this.scanStartupSubStartOffset = scanStartupSubStartOffset;
+        this.clientAuthPluginClassName = clientAuthPluginClassName;
+        this.clientAuthParams = clientAuthParams;
+
     }
 
     /**
@@ -107,23 +129,28 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric, Meta
     @Override
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
-        if (StringUtils.isEmpty(this.primaryKey)) {
+        if (StringUtils.isBlank(this.primaryKey)) {
             options.put("connector", "pulsar-inlong");
             options.putAll(format.generateOptions(false));
         } else {
             options.put("connector", "upsert-pulsar-inlong");
             options.putAll(format.generateOptions(true));
         }
-        if (adminUrl != null) {
+        if (StringUtils.isNotBlank(adminUrl)) {
             options.put("admin-url", adminUrl);
         }
         options.put("service-url", serviceUrl);
         options.put("topic", topic);
         options.put("scan.startup.mode", scanStartupMode);
-        if (scanStartupSubName != null) {
+        if (StringUtils.isNotBlank(scanStartupSubName)) {
             options.put("scan.startup.sub-name", scanStartupSubName);
             options.put("scan.startup.sub-start-offset", 
scanStartupSubStartOffset);
         }
+        if (StringUtils.isNotBlank(clientAuthPluginClassName)
+                && StringUtils.isNotBlank(clientAuthParams)) {
+            options.put("pulsar.client.authPluginClassName", 
clientAuthPluginClassName);
+            options.put("pulsar.client.authParams", clientAuthParams);
+        }
         return options;
     }
 
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
index 8ab7264726..5c84a4e0e7 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
@@ -51,6 +51,8 @@ public class PulsarExtractNodeTest extends 
SerializeBaseTest<Node> {
                 "earliest",
                 null,
                 "subscription",
-                "earliest");
+                "earliest",
+                "org.apache.pulsar.client.impl.auth.AuthenticationToken",
+                "token auth params");
     }
 }
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
index 5db0e6ae5d..fe3d758fae 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
@@ -77,7 +77,9 @@ public class PulsarSqlParserTest extends AbstractTestBase {
                 "earliest",
                 null,
                 "test",
-                "earliest");
+                "earliest",
+                "org.apache.pulsar.client.impl.auth.AuthenticationToken",
+                "token auth params");
     }
 
     private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> 
outputs) {
diff --git a/inlong-sort/sort-dist/pom.xml b/inlong-sort/sort-dist/pom.xml
index 587d77613b..05d5da66af 100644
--- a/inlong-sort/sort-dist/pom.xml
+++ b/inlong-sort/sort-dist/pom.xml
@@ -197,6 +197,11 @@
                     <artifactId>flink-sql-avro</artifactId>
                     <version>${flink.version}</version>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.inlong</groupId>
+                    <artifactId>audit-sdk</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
             </dependencies>
         </profile>
     </profiles>
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index ba47c74356..98d7c559b8 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -429,4 +429,18 @@ public final class Constants {
                     .withDescription(
                             "Inner format");
 
+    public static final ConfigOption<String> 
PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME =
+            ConfigOptions.key("pulsar.client.authPluginClassName")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "pulsar client auth plugin class name");
+
+    public static final ConfigOption<String> PULSAR_AUTH_PARAMS =
+            ConfigOptions.key("pulsar.client.authParams")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "pulsar client auth params");
+
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index dd06bfe758..c731e22097 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -79,6 +79,8 @@ import static 
org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
 import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.PULSAR_AUTH_PARAMS;
+import static 
org.apache.inlong.sort.base.Constants.PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME;
 
 /**
  * Copy from 
io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9
@@ -333,6 +335,8 @@ public class PulsarDynamicTableFactory
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
         options.add(AUDIT_KEYS);
+        options.add(PULSAR_AUTH_PARAMS);
+        options.add(PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME);
 
         return options;
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
index f177eb7543..84a076d502 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
@@ -31,6 +31,8 @@ import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -55,6 +57,8 @@ public class PulsarTableSource implements ScanTableSource, 
SupportsReadingMetada
     // Format attributes
     // 
--------------------------------------------------------------------------------------------
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarTableSource.class);
+
     private static final String FORMAT_METADATA_PREFIX = "value.";
 
     private final PulsarTableDeserializationSchemaFactory 
deserializationSchemaFactory;
@@ -111,6 +115,7 @@ public class PulsarTableSource implements ScanTableSource, 
SupportsReadingMetada
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
         PulsarDeserializationSchema<RowData> deserializationSchema =
                 
deserializationSchemaFactory.createPulsarDeserialization(context);
+        LOG.info("pulsar source init with properties: {}", properties);
         PulsarSource<RowData> source =
                 PulsarSource.builder()
                         .setTopics(topics)

Reply via email to