This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 271541f598 [INLONG-9095][Sort] Support inlong-msg in pulsar flink 1.15 connector (#9096) 271541f598 is described below commit 271541f59824cf773044ff961367b5f0258b2b19 Author: Sting <zpen...@connect.ust.hk> AuthorDate: Tue Oct 24 16:44:40 2023 +0800 [INLONG-9095][Sort] Support inlong-msg in pulsar flink 1.15 connector (#9096) --- inlong-sort/sort-core/pom.xml | 6 ++++++ .../sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml | 7 ++++++- .../java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java | 4 +++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml index c463785605..e8303bc57d 100644 --- a/inlong-sort/sort-core/pom.xml +++ b/inlong-sort/sort-core/pom.xml @@ -281,6 +281,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-pulsar-v1.15</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml index 9fc0949b46..d2b24382a6 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml @@ -28,7 +28,7 @@ <artifactId>sort-connector-pulsar-v1.15</artifactId> <packaging>jar</packaging> - <name>Apache InLong - Sort-connector-pulsar-v1.15</name> + <name>Apache InLong - Sort-connector-pulsar</name> <properties> <pulsar.version>2.10.2</pulsar.version> @@ -129,6 +129,11 @@ <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-common</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java index 6784fc6790..05277ff5a2 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.pulsar; +import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchemaFactory; import org.apache.inlong.sort.pulsar.table.PulsarTableSource; @@ -110,7 +111,8 @@ public class PulsarTableFactory implements DynamicTableSourceFactory { PulsarSourceOptions.SOURCE_CONFIG_PREFIX, PulsarSourceOptions.CONSUMER_CONFIG_PREFIX, PulsarSinkOptions.PRODUCER_CONFIG_PREFIX, - PulsarSinkOptions.SINK_CONFIG_PREFIX); + PulsarSinkOptions.SINK_CONFIG_PREFIX, + ExtractNode.INLONG_MSG); validatePrimaryKeyConstraints( context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), helper);