This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c7543559b [INLONG-9248][Manager] Supports configuring builtIn fields 
for tube source and pulsar source (#9249)
8c7543559b is described below

commit 8c7543559b769832fb4534906f3f24225b1f143d
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Sat Nov 11 14:39:16 2023 +0800

    [INLONG-9248][Manager] Supports configuring builtIn fields for tube source 
and pulsar source (#9249)
---
 .../pojo/sort/node/provider/IcebergProvider.java   | 11 ++++++++++
 .../pojo/sort/node/provider/PulsarProvider.java    | 24 +++++++++++++++++++++-
 .../pojo/sort/node/provider/TubeMqProvider.java    | 24 ++++++++++++++++++++++
 .../inlong/manager/pojo/source/StreamSource.java   |  3 ++-
 4 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
index 06cd989198..4af912f347 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.sort.node.provider;
 
 import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
 import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
 import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
@@ -110,6 +111,16 @@ public class IcebergProvider implements 
ExtractNodeProvider, LoadNodeProvider {
         return streamFields;
     }
 
+    @Override
+    public List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) {
+        List<String> fieldNames = 
sinkFields.stream().map(SinkField::getFieldName).collect(Collectors.toList());
+        if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+            sinkFields.add(0, new SinkField(0, "long", 
MetaField.AUDIT_DATA_TIME.name(), "iceberg meta field",
+                    MetaField.AUDIT_DATA_TIME.name(), "long", 1, 
MetaField.AUDIT_DATA_TIME.name(), null));
+        }
+        return sinkFields;
+    }
+
     @Override
     public List<FieldInfo> getMetaFields() {
         List<FieldInfo> fieldInfos = new ArrayList<>();
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index b0bcd0c1c8..994ce8839a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -17,10 +17,13 @@
 
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
+import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
 import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
@@ -29,8 +32,10 @@ import org.apache.inlong.sort.protocol.node.format.Format;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * The Provider for creating Pulsar extract nodes.
@@ -50,7 +55,6 @@ public class PulsarProvider implements ExtractNodeProvider {
 
         String fullTopicName =
                 pulsarSource.getPulsarTenant() + "/" + 
pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
-
         Format format = parsingFormat(pulsarSource.getSerializationType(),
                 pulsarSource.getWrapType(),
                 pulsarSource.getDataSeparator(),
@@ -78,4 +82,22 @@ public class PulsarProvider implements ExtractNodeProvider {
                 pulsarSource.getSubscription(),
                 scanStartupSubStartOffset);
     }
+
+    @Override
+    public List<StreamField> addStreamMetaFields(List<StreamField> 
streamFields) {
+        List<String> fieldNames = 
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+        if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+            streamFields.add(0,
+                    new StreamField(0, "long", 
MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1,
+                            MetaField.AUDIT_DATA_TIME.name()));
+        }
+        return streamFields;
+    }
+
+    @Override
+    public List<FieldInfo> getMetaFields() {
+        List<FieldInfo> fieldInfos = new ArrayList<>();
+        fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), new 
LongFormatInfo()));
+        return fieldInfos;
+    }
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
index a1996ddc82..2942741a08 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
@@ -17,17 +17,22 @@
 
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
+import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
 import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
 import org.apache.inlong.sort.protocol.node.format.Format;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * The Provider for creating TubeMQ extract nodes.
@@ -63,4 +68,23 @@ public class TubeMqProvider implements ExtractNodeProvider {
                 source.getSessionKey(),
                 source.getStreamId());
     }
+
+    @Override
+    public List<StreamField> addStreamMetaFields(List<StreamField> 
streamFields) {
+        List<String> fieldNames = 
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+        if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+            streamFields.add(0,
+                    new StreamField(0, "long", 
MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1,
+                            MetaField.AUDIT_DATA_TIME.name()));
+        }
+        return streamFields;
+    }
+
+    @Override
+    public List<FieldInfo> getMetaFields() {
+        List<FieldInfo> fieldInfos = new ArrayList<>();
+        fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), new 
LongFormatInfo()));
+        return fieldInfos;
+    }
+
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
index 0e5594a5d8..42b829de2f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
@@ -114,8 +114,9 @@ public abstract class StreamSource extends StreamNode {
     @ApiModelProperty("Sub source information of existing agents")
     private List<SubSourceDTO> subSourceList;
 
+    @Builder.Default
     @ApiModelProperty(value = "Whether to ignore the parse errors of field 
value, true as default")
-    private boolean ignoreParseError;
+    private boolean ignoreParseError = true;
 
     public SourceRequest genSourceRequest() {
         return null;

Reply via email to