This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 2680927d7a25a5ab5ae2ec54550adc60c122eec8
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Fri Sep 22 10:56:50 2023 +0800

    [INLONG-8951][Manager] Support for configuring built-in fields for iceberg 
and starrocks (#8952)
---
 .../org/apache/inlong/common/enums/MetaField.java  |  7 ++-
 .../inlong/manager/pojo/sort/node/NodeFactory.java | 59 ++++++++++++++++++++++
 .../pojo/sort/node/base/ExtractNodeProvider.java   |  5 ++
 .../pojo/sort/node/base/LoadNodeProvider.java      |  4 ++
 .../manager/pojo/sort/node/base/NodeProvider.java  |  9 ++++
 .../pojo/sort/node/provider/IcebergProvider.java   | 25 +++++++++
 .../pojo/sort/node/provider/StarRocksProvider.java | 26 ++++++++++
 .../manager/pojo/sort/util/FieldInfoUtils.java     | 12 +++++
 .../resource/sort/DefaultSortConfigOperator.java   | 27 ++++++++--
 .../src/main/resources/application-dev.properties  |  2 +
 .../src/main/resources/application-prod.properties |  2 +
 .../src/main/resources/application-test.properties |  2 +
 .../org/apache/inlong/sort/protocol/Metadata.java  |  1 +
 .../protocol/node/extract/IcebergExtractNode.java  | 33 +++++++++++-
 .../org/apache/inlong/sort/base/Constants.java     |  2 +-
 .../sort/iceberg/IcebergReadableMetadata.java      |  4 +-
 .../starrocks/table/sink/utils/SchemaUtils.java    | 21 ++++----
 17 files changed, 222 insertions(+), 19 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
index f3c1a388f5..e6d0bbc6cb 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
@@ -163,7 +163,12 @@ public enum MetaField {
     /**
      * Timestamp of the Kafka record, it is only used for Kafka.
      */
-    TIMESTAMP;
+    TIMESTAMP,
+
+    /**
+     * Inlong data time for audit.
+     */
+    AUDIT_DATA_TIME;
 
     public static MetaField forName(String name) {
         for (MetaField metaField : values()) {
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
index cc6a545490..74f2efae00 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
@@ -18,14 +18,23 @@
 package org.apache.inlong.manager.pojo.sort.node;
 
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.transform.TransformResponse;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.transform.TransformNode;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -33,6 +42,7 @@ import java.util.stream.Collectors;
 /**
  * The node factory
  */
+@Slf4j
 public class NodeFactory {
 
     /**
@@ -61,4 +71,53 @@ public class NodeFactory {
             return 
LoadNodeProviderFactory.getLoadNodeProvider(sinkType).createLoadNode(v, 
constantFieldMap);
         }).collect(Collectors.toList());
     }
+
+    /**
+     * Create extract node from the given source.
+     */
+    public static ExtractNode createExtractNode(StreamSource sourceInfo) {
+        if (sourceInfo == null) {
+            return null;
+        }
+        String sourceType = sourceInfo.getSourceType();
+        return 
ExtractNodeProviderFactory.getExtractNodeProvider(sourceType).createExtractNode(sourceInfo);
+    }
+
+    /**
+     * Create load node from the given sink.
+     */
+    public static LoadNode createLoadNode(StreamSink sinkInfo, Map<String, 
StreamField> constantFieldMap) {
+        if (sinkInfo == null) {
+            return null;
+        }
+        String sinkType = sinkInfo.getSinkType();
+        return 
LoadNodeProviderFactory.getLoadNodeProvider(sinkType).createLoadNode(sinkInfo, 
constantFieldMap);
+    }
+
+    /**
+     * Add built-in field for extra node and load node
+     */
+    public static List<Node> addBuiltInField(StreamSource sourceInfo, 
StreamSink sinkInfo,
+            List<TransformResponse> transformResponses, Map<String, 
StreamField> constantFieldMap) {
+        ExtractNodeProvider extractNodeProvider = 
ExtractNodeProviderFactory.getExtractNodeProvider(
+                sourceInfo.getSourceType());
+        LoadNodeProvider loadNodeProvider = 
LoadNodeProviderFactory.getLoadNodeProvider(sinkInfo.getSinkType());
+
+        if (FieldInfoUtils.compareFields(extractNodeProvider.getMetaFields(), 
loadNodeProvider.getMetaFields())) {
+            extractNodeProvider.addStreamMetaFields(sourceInfo.getFieldList());
+            transformResponses.forEach(v -> 
extractNodeProvider.addStreamMetaFields(v.getFieldList()));
+            loadNodeProvider.addSinkMetaFields(sinkInfo.getSinkFieldList());
+        }
+
+        ExtractNode extractNode = 
extractNodeProvider.createExtractNode(sourceInfo);
+        List<TransformNode> transformNodes =
+                TransformNodeUtils.createTransformNodes(transformResponses, 
constantFieldMap);
+        LoadNode loadNode = loadNodeProvider.createLoadNode(sinkInfo, 
constantFieldMap);
+
+        List<Node> nodes = new ArrayList<>();
+        nodes.add(extractNode);
+        nodes.addAll(transformNodes);
+        nodes.add(loadNode);
+        return nodes;
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index 3642cc7a3e..a8622a7be8 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -135,4 +135,9 @@ public interface ExtractNodeProvider extends NodeProvider {
         }
         return format;
     }
+
+    default List<StreamField> addStreamMetaFields(List<StreamField> 
streamFields) {
+        return streamFields;
+    }
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
index 2df89d9710..4af5e532d4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -141,4 +141,8 @@ public interface LoadNodeProvider extends NodeProvider {
         }
         return format;
     }
+
+    default List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) {
+        return sinkFields;
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
index a40bd1106d..f17d38bde0 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
@@ -17,6 +17,10 @@
 
 package org.apache.inlong.manager.pojo.sort.node.base;
 
+import org.apache.inlong.sort.protocol.FieldInfo;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
@@ -45,4 +49,9 @@ public interface NodeProvider {
                 .filter(v -> Objects.nonNull(v.getValue()))
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().toString()));
     }
+
+    default List<FieldInfo> getMetaFields() {
+        return new ArrayList<>();
+    }
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
index 416a409ac5..06cd989198 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
+import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.manager.common.consts.StreamType;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
 import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
@@ -25,6 +26,7 @@ import 
org.apache.inlong.manager.pojo.source.iceberg.IcebergSource;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
@@ -33,12 +35,17 @@ import 
org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
 import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * The Provider for creating Iceberg load nodes.
  */
+@Slf4j
 public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider {
 
     @Override
@@ -91,4 +98,22 @@ public class IcebergProvider implements ExtractNodeProvider, 
LoadNodeProvider {
                 icebergSink.getCatalogUri(),
                 icebergSink.getWarehouse());
     }
+
+    @Override
+    public List<StreamField> addStreamMetaFields(List<StreamField> 
streamFields) {
+        List<String> fieldNames = 
streamFields.stream().map(StreamField::getFieldName).collect(Collectors.toList());
+        if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+            streamFields.add(0,
+                    new StreamField(0, "long", 
MetaField.AUDIT_DATA_TIME.name(), "data_time", null, 1,
+                            MetaField.AUDIT_DATA_TIME.name()));
+        }
+        return streamFields;
+    }
+
+    @Override
+    public List<FieldInfo> getMetaFields() {
+        List<FieldInfo> fieldInfos = new ArrayList<>();
+        fieldInfos.add(0, new MetaFieldInfo(MetaField.AUDIT_DATA_TIME.name(), 
MetaField.AUDIT_DATA_TIME));
+        return fieldInfos;
+    }
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
index acae2efeb3..5a86455a0f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
@@ -17,23 +17,31 @@
 
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
+import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
 import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * The Provider for creating StarRocks load nodes.
  */
+@Slf4j
 public class StarRocksProvider implements LoadNodeProvider {
 
     @Override
@@ -71,4 +79,22 @@ public class StarRocksProvider implements LoadNodeProvider {
                 starRocksSink.getDatabasePattern(),
                 starRocksSink.getTablePattern());
     }
+
+    @Override
+    public List<SinkField> addSinkMetaFields(List<SinkField> sinkFields) {
+        List<String> fieldNames = 
sinkFields.stream().map(SinkField::getFieldName).collect(Collectors.toList());
+        if (!fieldNames.contains(MetaField.AUDIT_DATA_TIME.name())) {
+            sinkFields.add(0, new SinkField(0, "long", 
MetaField.AUDIT_DATA_TIME.name(), "long",
+                    MetaField.AUDIT_DATA_TIME.name()));
+        }
+        return sinkFields;
+    }
+
+    @Override
+    public List<FieldInfo> getMetaFields() {
+        List<FieldInfo> fieldInfos = new ArrayList<>();
+        fieldInfos.add(0, new FieldInfo(MetaField.AUDIT_DATA_TIME.name(), new 
LongFormatInfo()));
+        return fieldInfos;
+    }
+
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
index 5f4d7f274d..43d48a6e76 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldInfoUtils.java
@@ -391,4 +391,16 @@ public class FieldInfoUtils {
         return sortFormat;
     }
 
+    public static boolean compareFields(List<FieldInfo> sourceFields, 
List<FieldInfo> targetFields) {
+        if (sourceFields.size() != targetFields.size()) {
+            return false;
+        }
+        for (int i = 0; i < sourceFields.size(); i++) {
+            if (!Objects.equals(sourceFields.get(i).getName(), 
targetFields.get(i).getName())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 52ff3bb5e8..74f0819613 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -35,6 +35,7 @@ import 
org.apache.inlong.manager.service.transform.StreamTransformService;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.transform.TransformNode;
 import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -42,6 +43,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
@@ -51,6 +53,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -63,6 +66,8 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultSortConfigOperator.class);
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
+    @Value("${metrics.audit.proxy.hosts:127.0.0.1}")
+    private String auditHost;
     @Autowired
     private StreamSourceService sourceService;
     @Autowired
@@ -127,12 +132,11 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 
             for (StreamSink sink : sinks) {
                 Map<String, Object> properties = sink.getProperties();
-                properties.putIfAbsent("metrics.audit.key", 
auditService.getAuditId(sink.getSinkType(), true));
+                addAuditId(sink.getProperties(), sink.getSinkType(), true);
             }
             for (StreamSource source : sources) {
                 source.setFieldList(inlongStream.getFieldList());
-                Map<String, Object> properties = source.getProperties();
-                properties.putIfAbsent("metrics.audit.key", 
auditService.getAuditId(source.getSourceType(), false));
+                addAuditId(source.getProperties(), source.getSourceType(), 
false);
             }
             List<NodeRelation> relations;
 
@@ -222,8 +226,13 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
     private List<Node> createNodes(List<StreamSource> sources, 
List<TransformResponse> transformResponses,
             List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) 
{
         List<Node> nodes = new ArrayList<>();
+        if (Objects.equals(sources.size(), sinks.size()) && 
Objects.equals(sources.size(), 1)) {
+            return NodeFactory.addBuiltInField(sources.get(0), sinks.get(0), 
transformResponses, constantFieldMap);
+        }
+        List<TransformNode> transformNodes =
+                TransformNodeUtils.createTransformNodes(transformResponses, 
constantFieldMap);
         nodes.addAll(NodeFactory.createExtractNodes(sources));
-        
nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, 
constantFieldMap));
+        nodes.addAll(transformNodes);
         nodes.addAll(NodeFactory.createLoadNodes(sinks, constantFieldMap));
         return nodes;
     }
@@ -264,4 +273,14 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
         groupInfo.getExtList().add(extInfo);
     }
 
+    private void addAuditId(Map<String, Object> properties, String type, 
boolean isSent) {
+        try {
+            String auditId = auditService.getAuditId(type, isSent);
+            properties.putIfAbsent("metrics.audit.key", auditId);
+            properties.putIfAbsent("metrics.audit.proxy.hosts", auditHost);
+        } catch (Exception e) {
+            LOGGER.error("Current type ={} is not set auditId", type);
+        }
+
+    }
 }
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-dev.properties 
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index ff248053a4..376d9b9f73 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -107,3 +107,5 @@ group.deleted.latest.hours=10
 group.deleted.batchSize=100
 # If turned on, the groups could be deleted periodically.
 group.deleted.enabled=false
+
+metrics.audit.proxy.hosts=127.0.0.1:10081
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-prod.properties 
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 69122ed272..c47fc92334 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -106,3 +106,5 @@ group.deleted.latest.hours=10
 group.deleted.batchSize=100
 # If turned on, the groups could be deleted periodically.
 group.deleted.enabled=false
+
+metrics.audit.proxy.hosts=127.0.0.1:10081
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-test.properties 
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index ff248053a4..376d9b9f73 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -107,3 +107,5 @@ group.deleted.latest.hours=10
 group.deleted.batchSize=100
 # If turned on, the groups could be deleted periodically.
 group.deleted.enabled=false
+
+metrics.audit.proxy.hosts=127.0.0.1:10081
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
index 8dcf069165..11c4ec6737 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
@@ -114,6 +114,7 @@ public interface Metadata {
             case BATCH_ID:
             case PARTITION:
             case OFFSET:
+            case AUDIT_DATA_TIME:
                 metadataType = "BIGINT";
                 break;
             case UPDATE_BEFORE:
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
index 14fec78da8..cbceb5485a 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
@@ -17,7 +17,10 @@
 
 package org.apache.inlong.sort.protocol.node.extract;
 
+import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.Metadata;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
@@ -32,8 +35,10 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Iceberg extract node for extract data from iceberg
@@ -42,7 +47,7 @@ import java.util.Map;
 @JsonTypeName("icebergExtract")
 @JsonInclude(JsonInclude.Include.NON_NULL)
 @Data
-public class IcebergExtractNode extends ExtractNode implements Serializable {
+public class IcebergExtractNode extends ExtractNode implements InlongMetric, 
Metadata, Serializable {
 
     @JsonProperty("tableName")
     @Nonnull
@@ -111,6 +116,8 @@ public class IcebergExtractNode extends ExtractNode 
implements Serializable {
         options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
         options.put(IcebergConstant.CATALOG_NAME_KEY, catalogName);
         options.put(IcebergConstant.STREAMING, "true");
+        options.put(IcebergConstant.STARTING_STRATEGY_KEY,
+                
IcebergConstant.StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL.name());
         if (null != uri) {
             options.put(IcebergConstant.URI_KEY, uri);
         }
@@ -133,4 +140,28 @@ public class IcebergExtractNode extends ExtractNode 
implements Serializable {
         return super.getPartitionFields();
     }
 
+    @Override
+    public String getMetadataKey(MetaField metaField) {
+        String metadataKey;
+        switch (metaField) {
+            case AUDIT_DATA_TIME:
+                metadataKey = "audit_data_time";
+                break;
+            default:
+                throw new 
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
+                        this.getClass().getSimpleName(), metaField));
+        }
+        return metadataKey;
+    }
+
+    @Override
+    public boolean isVirtual(MetaField metaField) {
+        return true;
+    }
+
+    @Override
+    public Set<MetaField> supportedMetaFields() {
+        return EnumSet.of(MetaField.AUDIT_DATA_TIME);
+    }
+
 }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 72ce9d89b8..5065c78c1f 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -167,7 +167,7 @@ public final class Constants {
 
     public static final String META_INCREMENTAL = "incremental_inlong";
 
-    public static final String META_INLONG_DATA_TIME = "inlong_data_time";
+    public static final String META_AUDIT_DATA_TIME = "audit_data_time";
 
     public static final ConfigOption<String> INLONG_METRIC =
             ConfigOptions.key("inlong.metric.labels")
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
index e003555a45..11f7c51d60 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
@@ -30,8 +30,8 @@ import java.io.Serializable;
  */
 public enum IcebergReadableMetadata {
 
-    INLONG_DATA_TIME(
-            Constants.META_INLONG_DATA_TIME,
+    AUDIT_DATA_TIME(
+            Constants.META_AUDIT_DATA_TIME,
             DataTypes.BIGINT().notNull(),
             r -> System.currentTimeMillis());
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
index 178196d466..76e91e6cf3 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
@@ -31,7 +31,7 @@ public class SchemaUtils implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private final String INLONG_DATA_TIME = "inlong_data_time";
+    private final String AUDIT_DATA_TIME = "audit_data_time";
     private final int DATA_TIME_ABSENT_INDEX = -1;
     private final int dataTimeFieldIndex;
 
@@ -41,16 +41,16 @@ public class SchemaUtils implements Serializable {
 
     public long getDataTime(Object[] data) {
         if (dataTimeFieldIndex == DATA_TIME_ABSENT_INDEX) {
-            // if INLONG_DATA_TIME field is absent, return local time
+            // if AUDIT_DATA_TIME field is absent, return local time
             return System.currentTimeMillis();
         }
         return (Long) data[dataTimeFieldIndex];
     }
 
     /**
-     * filter out INLONG_DATA_TIME field
+     * filter out AUDIT_DATA_TIME field
      * @param data
-     * @return data without INLONG_DATA_TIME
+     * @return data without AUDIT_DATA_TIME
      */
     public Object[] filterOutTimeField(Object[] data) {
         if (dataTimeFieldIndex == DATA_TIME_ABSENT_INDEX) {
@@ -66,24 +66,25 @@ public class SchemaUtils implements Serializable {
     }
 
     /**
-     * INLONG_DATA_TIME should not occur in actual data schema fields
+     * AUDIT_DATA_TIME should not occur in actual data schema fields
+     *
      * @param schema
-     * @return fieldNames without INLONG_DATA_TIME
+     * @return fieldNames without AUDIT_DATA_TIME
      */
     public String[] filterOutTimeField(TableSchema schema) {
         return Arrays.stream(schema.getFieldNames())
-                .filter(field -> !INLONG_DATA_TIME.equals(field))
+                .filter(field -> !AUDIT_DATA_TIME.equals(field))
                 .toArray(String[]::new);
     }
 
     /**
-     * get the index of INLONG_DATA_TIME in fieldNames
+     * get the index of AUDIT_DATA_TIME in fieldNames
      * @param fieldNames
-     * @return index of INLONG_DATA_TIME in fieldNames, or 
DATA_TIME_ABSENT_INDEX if absent
+     * @return index of AUDIT_DATA_TIME in fieldNames, or 
DATA_TIME_ABSENT_INDEX if absent
      */
     private int getDataTimeIndex(String[] fieldNames) {
         for (int i = 0; i < fieldNames.length; i++) {
-            if (INLONG_DATA_TIME.equals(fieldNames[i])) {
+            if (AUDIT_DATA_TIME.equals(fieldNames[i])) {
                 return i;
             }
         }

Reply via email to