This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch branch-1.9 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 2680927d7a25a5ab5ae2ec54550adc60c122eec8 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Fri Sep 22 10:56:50 2023 +0800 [INLONG-8951][Manager] Support for configuring built-in fields for iceberg and starrocks (#8952) --- .../org/apache/inlong/common/enums/MetaField.java | 7 ++- .../inlong/manager/pojo/sort/node/NodeFactory.java | 59 ++++++++++++++++++++++ .../pojo/sort/node/base/ExtractNodeProvider.java | 5 ++ .../pojo/sort/node/base/LoadNodeProvider.java | 4 ++ .../manager/pojo/sort/node/base/NodeProvider.java | 9 ++++ .../pojo/sort/node/provider/IcebergProvider.java | 25 +++++++++ .../pojo/sort/node/provider/StarRocksProvider.java | 26 ++++++++++ .../manager/pojo/sort/util/FieldInfoUtils.java | 12 +++++ .../resource/sort/DefaultSortConfigOperator.java | 27 ++++++++-- .../src/main/resources/application-dev.properties | 2 + .../src/main/resources/application-prod.properties | 2 + .../src/main/resources/application-test.properties | 2 + .../org/apache/inlong/sort/protocol/Metadata.java | 1 + .../protocol/node/extract/IcebergExtractNode.java | 33 +++++++++++- .../org/apache/inlong/sort/base/Constants.java | 2 +- .../sort/iceberg/IcebergReadableMetadata.java | 4 +- .../starrocks/table/sink/utils/SchemaUtils.java | 21 ++++---- 17 files changed, 222 insertions(+), 19 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java index f3c1a388f5..e6d0bbc6cb 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java @@ -163,7 +163,12 @@ public enum MetaField { /** * Timestamp of the Kafka record, it is only used for Kafka. */ - TIMESTAMP; + TIMESTAMP, + + /** + * Inlong data time for audit. + */ + AUDIT_DATA_TIME; public static MetaField forName(String name) { for (MetaField metaField : values()) { diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java index cc6a545490..74f2efae00 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java @@ -18,14 +18,23 @@ package org.apache.inlong.manager.pojo.sort.node; import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider; +import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider; +import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils; +import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils; import org.apache.inlong.manager.pojo.source.StreamSource; import org.apache.inlong.manager.pojo.stream.StreamField; +import org.apache.inlong.manager.pojo.transform.TransformResponse; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.node.LoadNode; +import org.apache.inlong.sort.protocol.node.Node; +import org.apache.inlong.sort.protocol.node.transform.TransformNode; import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -33,6 +42,7 @@ import java.util.stream.Collectors; /** * The node factory */ +@Slf4j public class NodeFactory { /** @@ -61,4 +71,53 @@ public class NodeFactory { return LoadNodeProviderFactory.getLoadNodeProvider(sinkType).createLoadNode(v, constantFieldMap); }).collect(Collectors.toList()); } + + /** + * Create extract node from the given source. + */ + public static ExtractNode createExtractNode(StreamSource sourceInfo) { + if (sourceInfo == null) { + return null; + } + String sourceType = sourceInfo.getSourceType(); + return ExtractNodeProviderFactory.getExtractNodeProvider(sourceType).createExtractNode(sourceInfo); + } + + /** + * Create load node from the given sink. + */ + public static LoadNode createLoadNode(StreamSink sinkInfo, Map<String, StreamField> constantFieldMap) { + if (sinkInfo == null) { + return null; + } + String sinkType = sinkInfo.getSinkType(); + return LoadNodeProviderFactory.getLoadNodeProvider(sinkType).createLoadNode(sinkInfo, constantFieldMap); + } + + /** + * Add built-in field for extra node and load node + */ + public static List<Node> addBuiltInField(StreamSource sourceInfo, StreamSink sinkInfo, + List<TransformResponse> transformResponses, Map<String, StreamField> constantFieldMap) { + ExtractNodeProvider extractNodeProvider = ExtractNodeProviderFactory.getExtractNodeProvider( + sourceInfo.getSourceType()); + LoadNodeProvider loadNodeProvider = LoadNodeProviderFactory.getLoadNodeProvider(sinkInfo.getSinkType()); + + if (FieldInfoUtils.compareFields(extractNodeProvider.getMetaFields(), loadNodeProvider.getMetaFields())) { + extractNodeProvider.addStreamMetaFields(sourceInfo.getFieldList()); + transformResponses.forEach(v -> extractNodeProvider.addStreamMetaFields(v.getFieldList())); + loadNodeProvider.addSinkMetaFields(sinkInfo.getSinkFieldList()); + } + + ExtractNode extractNode = extractNodeProvider.createExtractNode(sourceInfo); + List<TransformNode> transformNodes = + TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap); + LoadNode loadNode = loadNodeProvider.createLoadNode(sinkInfo, constantFieldMap); + + List<Node> nodes = new ArrayList<>(); + nodes.add(extractNode); + nodes.addAll(transformNodes); + nodes.add(loadNode); + return nodes; + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java index 3642cc7a3e..a8622a7be8 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java @@ -135,4 +135,9 @@ public interface ExtractNodeProvider extends NodeProvider { } return format; } + + default List<StreamField> addStreamMetaFields(List<StreamField> streamFields) { + return streamFields; + } + } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java index 2df89d9710..4af5e532d4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java @@ -141,4 +141,8 @@ public interface LoadNodeProvider extends NodeProvider { } return format; } + + default List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) { + return sinkFields; + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java index a40bd1106d..f17d38bde0 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java @@ -17,6 +17,10 @@ package org.apache.inlong.manager.pojo.sort.node.base; +import org.apache.inlong.sort.protocol.FieldInfo; + +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -45,4 +49,9 @@ public interface NodeProvider { .filter(v -> Objects.nonNull(v.getValue())) .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())); } + + default List<FieldInfo> getMetaFields() { + return new ArrayList<>(); + } + } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java index 416a409ac5..06cd989198 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.pojo.sort.node.provider; +import org.apache.inlong.common.enums.MetaField; import org.apache.inlong.manager.common.consts.StreamType; import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink; import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider; @@ -25,6 +26,7 @@ import org.apache.inlong.manager.pojo.source.iceberg.IcebergSource; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.stream.StreamNode; import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.MetaFieldInfo; import org.apache.inlong.sort.protocol.constant.IcebergConstant; import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType; import org.apache.inlong.sort.protocol.node.ExtractNode; @@ -33,12 +35,17 @@ import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode; import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode; import org.apache.inlong.sort.protocol.transformation.FieldRelation; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * The Provider for creating Iceberg load nodes. */ +@Slf4j public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider { @Override @@ -91,4 +98,22 @@ public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider { icebergSink.getCatalogUri(), icebergSink.getWarehouse()); } + + @Override + public List<StreamField> addStreamMetaFields(List<StreamField> streamFields) { + List<String> fieldNames = streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList()); + if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) { + streamFields.add(0, + new StreamField(0, "long", MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1, + MetaField.AUDIT_DATA_TIME.name())); + } + return streamFields; + } + + @Override + public List<FieldInfo> getMetaFields() { + List<FieldInfo> fieldInfos = new ArrayList<>(); + fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(), MetaField.AUDIT_DATA_TIME)); + return fieldInfos; + } } \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java index acae2efeb3..5a86455a0f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java @@ -17,23 +17,31 @@ package org.apache.inlong.manager.pojo.sort.node.provider; +import org.apache.inlong.common.enums.MetaField; import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink; import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.stream.StreamNode; +import org.apache.inlong.sort.formats.common.LongFormatInfo; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.node.LoadNode; import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode; import org.apache.inlong.sort.protocol.transformation.FieldRelation; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * The Provider for creating StarRocks load nodes. */ +@Slf4j public class StarRocksProvider implements LoadNodeProvider { @Override @@ -71,4 +79,22 @@ public class StarRocksProvider implements LoadNodeProvider { starRocksSink.getDatabasePattern(), starRocksSink.getTablePattern()); } + + @Override + public List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) { + List<String> fieldNames = sinkFields.stream().map(SinkField::getFieldName).collect(Collectors.toList()); + if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) { + sinkFields.add(0, new SinkField(0, "long", MetaField.AUDIT_DATA_TIME.name(), "long", + MetaField.AUDIT_DATA_TIME.name())); + } + return sinkFields; + } + + @Override + public List<FieldInfo> getMetaFields() { + List<FieldInfo> fieldInfos = new ArrayList<>(); + fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), new LongFormatInfo())); + return fieldInfos; + } + } \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java index 5f4d7f274d..43d48a6e76 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java @@ -391,4 +391,16 @@ public class FieldInfoUtils { return sortFormat; } + public static boolean compareFields(List<FieldInfo> sourceFields, List<FieldInfo> targetFields) { + if (sourceFields.size() != targetFields.size()) { + return false; + } + for (int i = 0; i < sourceFields.size(); i++) { + if (!Objects.equals(sourceFields.get(i).getName(), targetFields.get(i).getName())) { + return false; + } + } + return true; + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 52ff3bb5e8..74f0819613 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -35,6 +35,7 @@ import org.apache.inlong.manager.service.transform.StreamTransformService; import org.apache.inlong.sort.protocol.GroupInfo; import org.apache.inlong.sort.protocol.StreamInfo; import org.apache.inlong.sort.protocol.node.Node; +import org.apache.inlong.sort.protocol.node.transform.TransformNode; import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation; import org.apache.commons.collections.CollectionUtils; @@ -42,6 +43,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.ArrayList; @@ -51,6 +53,7 @@ import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -63,6 +66,8 @@ public class DefaultSortConfigOperator implements SortConfigOperator { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSortConfigOperator.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Value("${metrics.audit.proxy.hosts:127.0.0.1}") + private String auditHost; @Autowired private StreamSourceService sourceService; @Autowired @@ -127,12 +132,11 @@ public class DefaultSortConfigOperator implements SortConfigOperator { for (StreamSink sink : sinks) { Map<String, Object> properties = sink.getProperties(); - properties.putIfAbsent("metrics.audit.key", auditService.getAuditId(sink.getSinkType(), true)); + addAuditId(sink.getProperties(), sink.getSinkType(), true); } for (StreamSource source : sources) { source.setFieldList(inlongStream.getFieldList()); - Map<String, Object> properties = source.getProperties(); - properties.putIfAbsent("metrics.audit.key", auditService.getAuditId(source.getSourceType(), false)); + addAuditId(source.getProperties(), source.getSourceType(), false); } List<NodeRelation> relations; @@ -222,8 +226,13 @@ public class DefaultSortConfigOperator implements SortConfigOperator { private List<Node> createNodes(List<StreamSource> sources, List<TransformResponse> transformResponses, List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) { List<Node> nodes = new ArrayList<>(); + if (Objects.equals(sources.size(), sinks.size()) && Objects.equals(sources.size(), 1)) { + return NodeFactory.addBuiltInField(sources.get(0), sinks.get(0), transformResponses, constantFieldMap); + } + List<TransformNode> transformNodes = + TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap); nodes.addAll(NodeFactory.createExtractNodes(sources)); - nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap)); + nodes.addAll(transformNodes); nodes.addAll(NodeFactory.createLoadNodes(sinks, constantFieldMap)); return nodes; } @@ -264,4 +273,14 @@ public class DefaultSortConfigOperator implements SortConfigOperator { groupInfo.getExtList().add(extInfo); } + private void addAuditId(Map<String, Object> properties, String type, boolean isSent) { + try { + String auditId = auditService.getAuditId(type, isSent); + properties.putIfAbsent("metrics.audit.key", auditId); + properties.putIfAbsent("metrics.audit.proxy.hosts", auditHost); + } catch (Exception e) { + LOGGER.error("Current type ={} is not set auditId", type); + } + + } } diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index ff248053a4..376d9b9f73 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -107,3 +107,5 @@ group.deleted.latest.hours=10 group.deleted.batchSize=100 # If turned on, the groups could be deleted periodically. group.deleted.enabled=false + +metrics.audit.proxy.hosts=127.0.0.1:10081 diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index 69122ed272..c47fc92334 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -106,3 +106,5 @@ group.deleted.latest.hours=10 group.deleted.batchSize=100 # If turned on, the groups could be deleted periodically. group.deleted.enabled=false + +metrics.audit.proxy.hosts=127.0.0.1:10081 diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index ff248053a4..376d9b9f73 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -107,3 +107,5 @@ group.deleted.latest.hours=10 group.deleted.batchSize=100 # If turned on, the groups could be deleted periodically. group.deleted.enabled=false + +metrics.audit.proxy.hosts=127.0.0.1:10081 diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java index 8dcf069165..11c4ec6737 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java @@ -114,6 +114,7 @@ public interface Metadata { case BATCH_ID: case PARTITION: case OFFSET: + case AUDIT_DATA_TIME: metadataType = "BIGINT"; break; case UPDATE_BEFORE: diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java index 14fec78da8..cbceb5485a 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java @@ -17,7 +17,10 @@ package org.apache.inlong.sort.protocol.node.extract; +import org.apache.inlong.common.enums.MetaField; import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.InlongMetric; +import org.apache.inlong.sort.protocol.Metadata; import org.apache.inlong.sort.protocol.constant.IcebergConstant; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.transformation.WatermarkField; @@ -32,8 +35,10 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * Iceberg extract node for extract data from iceberg @@ -42,7 +47,7 @@ import java.util.Map; @JsonTypeName("icebergExtract") @JsonInclude(JsonInclude.Include.NON_NULL) @Data -public class IcebergExtractNode extends ExtractNode implements Serializable { +public class IcebergExtractNode extends ExtractNode implements InlongMetric, Metadata, Serializable { @JsonProperty("tableName") @Nonnull @@ -111,6 +116,8 @@ public class IcebergExtractNode extends ExtractNode implements Serializable { options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name()); options.put(IcebergConstant.CATALOG_NAME_KEY, catalogName); options.put(IcebergConstant.STREAMING, "true"); + options.put(IcebergConstant.STARTING_STRATEGY_KEY, + IcebergConstant.StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL.name()); if (null != uri) { options.put(IcebergConstant.URI_KEY, uri); } @@ -133,4 +140,28 @@ public class IcebergExtractNode extends ExtractNode implements Serializable { return super.getPartitionFields(); } + @Override + public String getMetadataKey(MetaField metaField) { + String metadataKey; + switch (metaField) { + case AUDIT_DATA_TIME: + metadataKey = "audit_data_time"; + break; + default: + throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", + this.getClass().getSimpleName(), metaField)); + } + return metadataKey; + } + + @Override + public boolean isVirtual(MetaField metaField) { + return true; + } + + @Override + public Set<MetaField> supportedMetaFields() { + return EnumSet.of(MetaField.AUDIT_DATA_TIME); + } + } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 72ce9d89b8..5065c78c1f 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -167,7 +167,7 @@ public final class Constants { public static final String META_INCREMENTAL = "incremental_inlong"; - public static final String META_INLONG_DATA_TIME = "inlong_data_time"; + public static final String META_AUDIT_DATA_TIME = "audit_data_time"; public static final ConfigOption<String> INLONG_METRIC = ConfigOptions.key("inlong.metric.labels") diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java index e003555a45..11f7c51d60 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java @@ -30,8 +30,8 @@ import java.io.Serializable; */ public enum IcebergReadableMetadata { - INLONG_DATA_TIME( - Constants.META_INLONG_DATA_TIME, + AUDIT_DATA_TIME( + Constants.META_AUDIT_DATA_TIME, DataTypes.BIGINT().notNull(), r -> System.currentTimeMillis()); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java index 178196d466..76e91e6cf3 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java @@ -31,7 +31,7 @@ public class SchemaUtils implements Serializable { private static final long serialVersionUID = 1L; - private final String INLONG_DATA_TIME = "inlong_data_time"; + private final String AUDIT_DATA_TIME = "audit_data_time"; private final int DATA_TIME_ABSENT_INDEX = -1; private final int dataTimeFieldIndex; @@ -41,16 +41,16 @@ public class SchemaUtils implements Serializable { public long getDataTime(Object[] data) { if (dataTimeFieldIndex == DATA_TIME_ABSENT_INDEX) { - // if INLONG_DATA_TIME field is absent, return local time + // if AUDIT_DATA_TIME field is absent, return local time return System.currentTimeMillis(); } return (Long) data[dataTimeFieldIndex]; } /** - * filter out INLONG_DATA_TIME field + * filter out AUDIT_DATA_TIME field * @param data - * @return data without INLONG_DATA_TIME + * @return data without AUDIT_DATA_TIME */ public Object[] filterOutTimeField(Object[] data) { if (dataTimeFieldIndex == DATA_TIME_ABSENT_INDEX) { @@ -66,24 +66,25 @@ public class SchemaUtils implements Serializable { } /** - * INLONG_DATA_TIME should not occur in actual data schema fields + * AUDIT_DATA_TIME should not occur in actual data schema fields + * * @param schema - * @return fieldNames without INLONG_DATA_TIME + * @return fieldNames without AUDIT_DATA_TIME */ public String[] filterOutTimeField(TableSchema schema) { return Arrays.stream(schema.getFieldNames()) - .filter(field -> !INLONG_DATA_TIME.equals(field)) + .filter(field -> !AUDIT_DATA_TIME.equals(field)) .toArray(String[]::new); } /** - * get the index of INLONG_DATA_TIME in fieldNames + * get the index of AUDIT_DATA_TIME in fieldNames * @param fieldNames - * @return index of INLONG_DATA_TIME in fieldNames, or DATA_TIME_ABSENT_INDEX if absent + * @return index of AUDIT_DATA_TIME in fieldNames, or DATA_TIME_ABSENT_INDEX if absent */ private int getDataTimeIndex(String[] fieldNames) { for (int i = 0; i < fieldNames.length; i++) { - if (INLONG_DATA_TIME.equals(fieldNames[i])) { + if (AUDIT_DATA_TIME.equals(fieldNames[i])) { return i; } }