This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new b1de06b94e [INLONG-9972][Sort] Pulsar connector support authentication when connecting to Pulsar cluster (#9973) b1de06b94e is described below commit b1de06b94e3fe7f18532a04c082a483cb060edda Author: StingPeng <zpen...@connect.ust.hk> AuthorDate: Mon Apr 15 09:49:11 2024 +0800 [INLONG-9972][Sort] Pulsar connector support authentication when connecting to Pulsar cluster (#9973) --- .../pojo/sort/node/provider/PulsarProvider.java | 4 ++- .../protocol/node/extract/PulsarExtractNode.java | 35 +++++++++++++++++++--- .../node/extract/PulsarExtractNodeTest.java | 4 ++- .../inlong/sort/parser/PulsarSqlParserTest.java | 4 ++- inlong-sort/sort-dist/pom.xml | 5 ++++ .../org/apache/inlong/sort/base/Constants.java | 14 +++++++++ .../pulsar/table/PulsarDynamicTableFactory.java | 4 +++ .../sort/pulsar/table/PulsarTableSource.java | 5 ++++ 8 files changed, 68 insertions(+), 7 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java index a767f799f3..9ef0a1634a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java @@ -80,7 +80,9 @@ public class PulsarProvider implements ExtractNodeProvider { startupMode.getValue(), primaryKey, pulsarSource.getSubscription(), - scanStartupSubStartOffset); + scanStartupSubStartOffset, + "", + ""); } @Override diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java index bc09b52e2c..9a2adcc8e3 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -73,6 +73,22 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta @JsonProperty("scanStartupSubStartOffset") private String scanStartupSubStartOffset; + /** + * pulsar client auth plugin class name + * e.g. org.apache.pulsar.client.impl.auth.AuthenticationToken + */ + @JsonProperty("clientAuthPluginClassName") + private String clientAuthPluginClassName; + + /** + * pulsar client auth params + * e.g. token:{tokenString} + * the tokenString should be compatible with the clientAuthPluginClassName see also in: + * <a href="https://pulsar.apache.org/docs/next/security-jwt/"> pulsar auth </a> + */ + @JsonProperty("clientAuthParams") + private String clientAuthParams; + @JsonCreator public PulsarExtractNode(@JsonProperty("id") String id, @JsonProperty("name") String name, @@ -86,7 +102,10 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta @Nonnull @JsonProperty("scanStartupMode") String scanStartupMode, @JsonProperty("primaryKey") String primaryKey, @JsonProperty("scanStartupSubName") String scanStartupSubName, - @JsonProperty("scanStartupSubStartOffset") String scanStartupSubStartOffset) { + @JsonProperty("scanStartupSubStartOffset") String scanStartupSubStartOffset, + @JsonProperty("clientAuthPluginClassName") String clientAuthPluginClassName, + @JsonProperty("clientAuthParams") String clientAuthParams) { + super(id, name, fields, watermarkField, properties); this.topic = Preconditions.checkNotNull(topic, "pulsar topic is null."); this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar serviceUrl is null."); @@ -97,6 +116,9 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta this.primaryKey = primaryKey; this.scanStartupSubName = scanStartupSubName; this.scanStartupSubStartOffset = scanStartupSubStartOffset; + this.clientAuthPluginClassName = clientAuthPluginClassName; + this.clientAuthParams = clientAuthParams; + } /** @@ -107,23 +129,28 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta @Override public Map<String, String> tableOptions() { Map<String, String> options = super.tableOptions(); - if (StringUtils.isEmpty(this.primaryKey)) { + if (StringUtils.isBlank(this.primaryKey)) { options.put("connector", "pulsar-inlong"); options.putAll(format.generateOptions(false)); } else { options.put("connector", "upsert-pulsar-inlong"); options.putAll(format.generateOptions(true)); } - if (adminUrl != null) { + if (StringUtils.isNotBlank(adminUrl)) { options.put("admin-url", adminUrl); } options.put("service-url", serviceUrl); options.put("topic", topic); options.put("scan.startup.mode", scanStartupMode); - if (scanStartupSubName != null) { + if (StringUtils.isNotBlank(scanStartupSubName)) { options.put("scan.startup.sub-name", scanStartupSubName); options.put("scan.startup.sub-start-offset", scanStartupSubStartOffset); } + if (StringUtils.isNotBlank(clientAuthPluginClassName) + && StringUtils.isNotBlank(clientAuthParams)) { + options.put("pulsar.client.authPluginClassName", clientAuthPluginClassName); + options.put("pulsar.client.authParams", clientAuthParams); + } return options; } diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java index 8ab7264726..5c84a4e0e7 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java @@ -51,6 +51,8 @@ public class PulsarExtractNodeTest extends SerializeBaseTest<Node> { "earliest", null, "subscription", - "earliest"); + "earliest", + "org.apache.pulsar.client.impl.auth.AuthenticationToken", + "token auth params"); } } diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java index 5db0e6ae5d..fe3d758fae 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java @@ -77,7 +77,9 @@ public class PulsarSqlParserTest extends AbstractTestBase { "earliest", null, "test", - "earliest"); + "earliest", + "org.apache.pulsar.client.impl.auth.AuthenticationToken", + "token auth params"); } private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) { diff --git a/inlong-sort/sort-dist/pom.xml b/inlong-sort/sort-dist/pom.xml index 587d77613b..05d5da66af 100644 --- a/inlong-sort/sort-dist/pom.xml +++ b/inlong-sort/sort-dist/pom.xml @@ -197,6 +197,11 @@ <artifactId>flink-sql-avro</artifactId> <version>${flink.version}</version> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>audit-sdk</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </profile> </profiles> diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index ba47c74356..98d7c559b8 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -429,4 +429,18 @@ public final class Constants { .withDescription( "Inner format"); + public static final ConfigOption<String> PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME = + ConfigOptions.key("pulsar.client.authPluginClassName") + .stringType() + .noDefaultValue() + .withDescription( + "pulsar client auth plugin class name"); + + public static final ConfigOption<String> PULSAR_AUTH_PARAMS = + ConfigOptions.key("pulsar.client.authParams") + .stringType() + .noDefaultValue() + .withDescription( + "pulsar client auth params"); + } diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java index dd06bfe758..c731e22097 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java @@ -79,6 +79,8 @@ import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; +import static org.apache.inlong.sort.base.Constants.PULSAR_AUTH_PARAMS; +import static org.apache.inlong.sort.base.Constants.PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME; /** * Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9 @@ -333,6 +335,8 @@ public class PulsarDynamicTableFactory options.add(INLONG_METRIC); options.add(INLONG_AUDIT); options.add(AUDIT_KEYS); + options.add(PULSAR_AUTH_PARAMS); + options.add(PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME); return options; } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java index f177eb7543..84a076d502 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java @@ -31,6 +31,8 @@ import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.pulsar.client.api.SubscriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -55,6 +57,8 @@ public class PulsarTableSource implements ScanTableSource, SupportsReadingMetada // Format attributes // -------------------------------------------------------------------------------------------- + private static final Logger LOG = LoggerFactory.getLogger(PulsarTableSource.class); + private static final String FORMAT_METADATA_PREFIX = "value."; private final PulsarTableDeserializationSchemaFactory deserializationSchemaFactory; @@ -111,6 +115,7 @@ public class PulsarTableSource implements ScanTableSource, SupportsReadingMetada public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { PulsarDeserializationSchema<RowData> deserializationSchema = deserializationSchemaFactory.createPulsarDeserialization(context); + LOG.info("pulsar source init with properties: {}", properties); PulsarSource<RowData> source = PulsarSource.builder() .setTopics(topics)