This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 271541f598 [INLONG-9095][Sort] Support inlong-msg in pulsar flink 1.15 
connector (#9096)
271541f598 is described below

commit 271541f59824cf773044ff961367b5f0258b2b19
Author: Sting <zpen...@connect.ust.hk>
AuthorDate: Tue Oct 24 16:44:40 2023 +0800

    [INLONG-9095][Sort] Support inlong-msg in pulsar flink 1.15 connector 
(#9096)
---
 inlong-sort/sort-core/pom.xml                                      | 6 ++++++
 .../sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml     | 7 ++++++-
 .../java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java     | 4 +++-
 3 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index c463785605..e8303bc57d 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -281,6 +281,12 @@
                     <version>${project.version}</version>
                     <scope>test</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.inlong</groupId>
+                    <artifactId>sort-connector-pulsar-v1.15</artifactId>
+                    <version>${project.version}</version>
+                    <scope>test</scope>
+                </dependency>
             </dependencies>
             <build>
                 <plugins>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
index 9fc0949b46..d2b24382a6 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
@@ -28,7 +28,7 @@
 
     <artifactId>sort-connector-pulsar-v1.15</artifactId>
     <packaging>jar</packaging>
-    <name>Apache InLong - Sort-connector-pulsar-v1.15</name>
+    <name>Apache InLong - Sort-connector-pulsar</name>
 
     <properties>
         <pulsar.version>2.10.2</pulsar.version>
@@ -129,6 +129,11 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-common</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 6784fc6790..05277ff5a2 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.pulsar;
 
+import org.apache.inlong.sort.protocol.node.ExtractNode;
 import 
org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchemaFactory;
 import org.apache.inlong.sort.pulsar.table.PulsarTableSource;
 
@@ -110,7 +111,8 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
                 PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
                 PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
                 PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
-                PulsarSinkOptions.SINK_CONFIG_PREFIX);
+                PulsarSinkOptions.SINK_CONFIG_PREFIX,
+                ExtractNode.INLONG_MSG);
 
         validatePrimaryKeyConstraints(
                 context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), 
helper);

Reply via email to