This is an automated email from the ASF dual-hosted git repository.

wakefu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f02e93069 [INLONG-11794][Manager] Pulsar source supports setting 
scan.startup.mode to null (#11795)
6f02e93069 is described below

commit 6f02e93069b396a2669e8386b501f81d07acdd7f
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Mar 5 09:54:45 2025 +0800

    [INLONG-11794][Manager] Pulsar source supports setting scan.startup.mode to 
null (#11795)
---
 .../manager/pojo/sort/node/provider/PulsarProvider.java     |  6 ++++--
 .../sort/protocol/node/extract/PulsarExtractNode.java       | 13 ++++++++-----
 2 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index 50254ddadc..b8a1e8b678 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -66,7 +66,9 @@ public class PulsarProvider implements ExtractNodeProvider {
                 pulsarSource.getDataEscapeChar(),
                 pulsarSource.getIgnoreParseError());
 
-        PulsarScanStartupMode startupMode = 
PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode());
+        String startupMode =
+                StringUtils.isNotBlank(pulsarSource.getScanStartupMode()) ? 
PulsarScanStartupMode.forName(
+                        pulsarSource.getScanStartupMode()).getValue() : null;
         final String primaryKey = pulsarSource.getPrimaryKey();
         final String serviceUrl = pulsarSource.getServiceUrl();
         final String adminUrl = pulsarSource.getAdminUrl();
@@ -83,7 +85,7 @@ public class PulsarProvider implements ExtractNodeProvider {
                 adminUrl,
                 serviceUrl,
                 format,
-                startupMode.getValue(),
+                startupMode,
                 primaryKey,
                 pulsarSource.getSubscription(),
                 scanStartupSubStartOffset,
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
index 1887aca145..8ba110d66d 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -108,7 +108,7 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric, Meta
             @JsonProperty("adminUrl") String adminUrl,
             @Nonnull @JsonProperty("serviceUrl") String serviceUrl,
             @Nonnull @JsonProperty("format") Format format,
-            @Nonnull @JsonProperty("scanStartupMode") String scanStartupMode,
+            @JsonProperty("scanStartupMode") String scanStartupMode,
             @JsonProperty("primaryKey") String primaryKey,
             @JsonProperty("scanStartupSubName") String scanStartupSubName,
             @JsonProperty("scanStartupSubStartOffset") String 
scanStartupSubStartOffset,
@@ -119,8 +119,7 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric, Meta
         this.topic = Preconditions.checkNotNull(topic, "pulsar topic is 
null.");
         this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar 
serviceUrl is null.");
         this.format = Preconditions.checkNotNull(format, "pulsar format is 
null.");
-        this.scanStartupMode = Preconditions.checkNotNull(scanStartupMode,
-                "pulsar scanStartupMode is null.");
+        this.scanStartupMode = scanStartupMode;
         this.adminUrl = adminUrl;
         this.primaryKey = primaryKey;
         this.scanStartupSubName = scanStartupSubName;
@@ -150,10 +149,14 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric, Meta
         }
         options.put("service-url", serviceUrl);
         options.put("topic", topic);
-        options.put("scan.startup.mode", scanStartupMode);
+        if (StringUtils.isNotBlank(scanStartupMode)) {
+            options.put("scan.startup.mode", scanStartupMode);
+        }
         if (StringUtils.isNotBlank(scanStartupSubName)) {
             options.put("scan.startup.sub-name", scanStartupSubName);
-            options.put("scan.startup.sub-start-offset", 
scanStartupSubStartOffset);
+            if (StringUtils.isNotBlank(scanStartupSubStartOffset)) {
+                options.put("scan.startup.sub-start-offset", 
scanStartupSubStartOffset);
+            }
         }
 
         if (StringUtils.isNotBlank(clientAuthPluginClassName)

Reply via email to