This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 8e9f276961 [INLONG-10508][Sort] Fix pulsar connector flink 1.15 scan start up mode parameter cannot keep consistent with flink 1.13 (#10509) 8e9f276961 is described below commit 8e9f276961d1b45553422d9e16368c40238a660c Author: ZiruiPeng <zpen...@connect.ust.hk> AuthorDate: Tue Jun 25 16:48:14 2024 +0800 [INLONG-10508][Sort] Fix pulsar connector flink 1.15 scan start up mode parameter cannot keep consistent with flink 1.13 (#10509) * [INLONG-10508][Sort] Fix pulsar connector flink 1.15 scan start up mode parameter cannot keep consistent with flink 1.13 * fix format --- .../org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java | 9 ++++++++- .../java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java index 5b2488dd40..c438fe1394 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.pulsar; +import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode; + import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; @@ -175,7 +177,12 @@ public class PulsarTableOptionUtils { public static StartCursor getStartCursor(ReadableConfig tableOptions) { if (tableOptions.getOptional(STARTUP_MODE).isPresent()) { - return parseMessageIdStartCursor(tableOptions.get(STARTUP_MODE)); + String mode = tableOptions.getOptional(STARTUP_MODE).get(); + // to keep consistent with pulsar connector in flink 1.13 + if (mode.equals(PulsarScanStartupMode.EXTERNAL_SUBSCRIPTION.getValue())) { + return parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID)); + } + return parseMessageIdStartCursor(mode); } else if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) { return parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID)); } else if (tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java index 59e72201f8..6ef1450fb7 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java @@ -121,6 +121,7 @@ public final class PulsarTableOptions { code("earliest"), code("latest"), code("ledgerId:entryId:partitionId"), + code("external-subscription"), code("12:2:-1")) .build());