This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e9f276961 [INLONG-10508][Sort] Fix pulsar connector flink 1.15 scan 
start up mode parameter cannot keep consistent with flink 1.13 (#10509)
8e9f276961 is described below

commit 8e9f276961d1b45553422d9e16368c40238a660c
Author: ZiruiPeng <zpen...@connect.ust.hk>
AuthorDate: Tue Jun 25 16:48:14 2024 +0800

    [INLONG-10508][Sort] Fix pulsar connector flink 1.15 scan start up mode 
parameter cannot keep consistent with flink 1.13 (#10509)
    
    * [INLONG-10508][Sort] Fix pulsar connector flink 1.15 scan start up mode 
parameter cannot keep consistent with flink 1.13
    
    * fix format
---
 .../org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java    | 9 ++++++++-
 .../java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java   | 1 +
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
index 5b2488dd40..c438fe1394 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.pulsar;
 
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
+
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
@@ -175,7 +177,12 @@ public class PulsarTableOptionUtils {
 
     public static StartCursor getStartCursor(ReadableConfig tableOptions) {
         if (tableOptions.getOptional(STARTUP_MODE).isPresent()) {
-            return parseMessageIdStartCursor(tableOptions.get(STARTUP_MODE));
+            String mode = tableOptions.getOptional(STARTUP_MODE).get();
+            // to keep consistent with pulsar connector in flink 1.13
+            if 
(mode.equals(PulsarScanStartupMode.EXTERNAL_SUBSCRIPTION.getValue())) {
+                return 
parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID));
+            }
+            return parseMessageIdStartCursor(mode);
         } else if 
(tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) {
             return 
parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID));
         } else if 
(tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
index 59e72201f8..6ef1450fb7 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
@@ -121,6 +121,7 @@ public final class PulsarTableOptions {
                                             code("earliest"),
                                             code("latest"),
                                             
code("ledgerId:entryId:partitionId"),
+                                            code("external-subscription"),
                                             code("12:2:-1"))
                                     .build());
 

Reply via email to