This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 8c7543559b [INLONG-9248][Manager] Supports configuring builtIn fields for tube source and pulsar source (#9249) 8c7543559b is described below commit 8c7543559b769832fb4534906f3f24225b1f143d Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Sat Nov 11 14:39:16 2023 +0800 [INLONG-9248][Manager] Supports configuring builtIn fields for tube source and pulsar source (#9249) --- .../pojo/sort/node/provider/IcebergProvider.java | 11 ++++++++++ .../pojo/sort/node/provider/PulsarProvider.java | 24 +++++++++++++++++++++- .../pojo/sort/node/provider/TubeMqProvider.java | 24 ++++++++++++++++++++++ .../inlong/manager/pojo/source/StreamSource.java | 3 ++- 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java index 06cd989198..4af912f347 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java @@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.sort.node.provider; import org.apache.inlong.common.enums.MetaField; import org.apache.inlong.manager.common.consts.StreamType; +import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink; import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider; import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider; @@ -110,6 +111,16 @@ public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider { return streamFields; } + @Override + public List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) { + List<String> fieldNames = sinkFields.stream().map(SinkField::getFieldName).collect(Collectors.toList()); + if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) { + sinkFields.add(0, new SinkField(0, "long", MetaField.AUDIT_DATA_TIME.name(), "iceberg meta field", + MetaField.AUDIT_DATA_TIME.name(), "long", 1, MetaField.AUDIT_DATA_TIME.name(), null)); + } + return sinkFields; + } + @Override public List<FieldInfo> getMetaFields() { List<FieldInfo> fieldInfos = new ArrayList<>(); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java index b0bcd0c1c8..994ce8839a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java @@ -17,10 +17,13 @@ package org.apache.inlong.manager.pojo.sort.node.provider; +import org.apache.inlong.common.enums.MetaField; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider; import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource; +import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.stream.StreamNode; +import org.apache.inlong.sort.formats.common.LongFormatInfo; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode; import org.apache.inlong.sort.protocol.node.ExtractNode; @@ -29,8 +32,10 @@ import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.commons.lang3.StringUtils; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * The Provider for creating Pulsar extract nodes. @@ -50,7 +55,6 @@ public class PulsarProvider implements ExtractNodeProvider { String fullTopicName = pulsarSource.getPulsarTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic(); - Format format = parsingFormat(pulsarSource.getSerializationType(), pulsarSource.getWrapType(), pulsarSource.getDataSeparator(), @@ -78,4 +82,22 @@ public class PulsarProvider implements ExtractNodeProvider { pulsarSource.getSubscription(), scanStartupSubStartOffset); } + + @Override + public List<StreamField> addStreamMetaFields(List<StreamField> streamFields) { + List<String> fieldNames = streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList()); + if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) { + streamFields.add(0, + new StreamField(0, "long", MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1, + MetaField.AUDIT_DATA_TIME.name())); + } + return streamFields; + } + + @Override + public List<FieldInfo> getMetaFields() { + List<FieldInfo> fieldInfos = new ArrayList<>(); + fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), new LongFormatInfo())); + return fieldInfos; + } } \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java index a1996ddc82..2942741a08 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java @@ -17,17 +17,22 @@ package org.apache.inlong.manager.pojo.sort.node.provider; +import org.apache.inlong.common.enums.MetaField; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider; import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource; +import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.stream.StreamNode; +import org.apache.inlong.sort.formats.common.LongFormatInfo; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode; import org.apache.inlong.sort.protocol.node.format.Format; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * The Provider for creating TubeMQ extract nodes. @@ -63,4 +68,23 @@ public class TubeMqProvider implements ExtractNodeProvider { source.getSessionKey(), source.getStreamId()); } + + @Override + public List<StreamField> addStreamMetaFields(List<StreamField> streamFields) { + List<String> fieldNames = streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList()); + if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) { + streamFields.add(0, + new StreamField(0, "long", MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1, + MetaField.AUDIT_DATA_TIME.name())); + } + return streamFields; + } + + @Override + public List<FieldInfo> getMetaFields() { + List<FieldInfo> fieldInfos = new ArrayList<>(); + fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), new LongFormatInfo())); + return fieldInfos; + } + } \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java index 0e5594a5d8..42b829de2f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java @@ -114,8 +114,9 @@ public abstract class StreamSource extends StreamNode { @ApiModelProperty("Sub source information of existing agents") private List<SubSourceDTO> subSourceList; + @Builder.Default @ApiModelProperty(value = "Whether to ignore the parse errors of field value, true as default") - private boolean ignoreParseError; + private boolean ignoreParseError = true; public SourceRequest genSourceRequest() { return null;