This is an automated email from the ASF dual-hosted git repository. wakefu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6f02e93069 [INLONG-11794][Manager] Pulsar source supports setting scan.startup.mode to null (#11795) 6f02e93069 is described below commit 6f02e93069b396a2669e8386b501f81d07acdd7f Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Mar 5 09:54:45 2025 +0800 [INLONG-11794][Manager] Pulsar source supports setting scan.startup.mode to null (#11795) --- .../manager/pojo/sort/node/provider/PulsarProvider.java | 6 ++++-- .../sort/protocol/node/extract/PulsarExtractNode.java | 13 ++++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java index 50254ddadc..b8a1e8b678 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java @@ -66,7 +66,9 @@ public class PulsarProvider implements ExtractNodeProvider { pulsarSource.getDataEscapeChar(), pulsarSource.getIgnoreParseError()); - PulsarScanStartupMode startupMode = PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode()); + String startupMode = + StringUtils.isNotBlank(pulsarSource.getScanStartupMode()) ? PulsarScanStartupMode.forName( + pulsarSource.getScanStartupMode()).getValue() : null; final String primaryKey = pulsarSource.getPrimaryKey(); final String serviceUrl = pulsarSource.getServiceUrl(); final String adminUrl = pulsarSource.getAdminUrl(); @@ -83,7 +85,7 @@ public class PulsarProvider implements ExtractNodeProvider { adminUrl, serviceUrl, format, - startupMode.getValue(), + startupMode, primaryKey, pulsarSource.getSubscription(), scanStartupSubStartOffset, diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java index 1887aca145..8ba110d66d 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -108,7 +108,7 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta @JsonProperty("adminUrl") String adminUrl, @Nonnull @JsonProperty("serviceUrl") String serviceUrl, @Nonnull @JsonProperty("format") Format format, - @Nonnull @JsonProperty("scanStartupMode") String scanStartupMode, + @JsonProperty("scanStartupMode") String scanStartupMode, @JsonProperty("primaryKey") String primaryKey, @JsonProperty("scanStartupSubName") String scanStartupSubName, @JsonProperty("scanStartupSubStartOffset") String scanStartupSubStartOffset, @@ -119,8 +119,7 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta this.topic = Preconditions.checkNotNull(topic, "pulsar topic is null."); this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar serviceUrl is null."); this.format = Preconditions.checkNotNull(format, "pulsar format is null."); - this.scanStartupMode = Preconditions.checkNotNull(scanStartupMode, - "pulsar scanStartupMode is null."); + this.scanStartupMode = scanStartupMode; this.adminUrl = adminUrl; this.primaryKey = primaryKey; this.scanStartupSubName = scanStartupSubName; @@ -150,10 +149,14 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta } options.put("service-url", serviceUrl); options.put("topic", topic); - options.put("scan.startup.mode", scanStartupMode); + if (StringUtils.isNotBlank(scanStartupMode)) { + options.put("scan.startup.mode", scanStartupMode); + } if (StringUtils.isNotBlank(scanStartupSubName)) { options.put("scan.startup.sub-name", scanStartupSubName); - options.put("scan.startup.sub-start-offset", scanStartupSubStartOffset); + if (StringUtils.isNotBlank(scanStartupSubStartOffset)) { + options.put("scan.startup.sub-start-offset", scanStartupSubStartOffset); + } } if (StringUtils.isNotBlank(clientAuthPluginClassName)