This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 46d88ede02 [Refactor](Metadata tvf) Reconstruct Metadata table-value 
function into a more general framework. (#17590)
46d88ede02 is described below

commit 46d88ede024d1cb8c31837c54c1ac721b35e6966
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Fri Mar 17 19:54:50 2023 +0800

    [Refactor](Metadata tvf) Reconstruct Metadata table-value function into a 
more general framework. (#17590)
---
 be/src/vec/exec/scan/vmeta_scanner.cpp             |  62 +++---
 be/src/vec/exec/scan/vmeta_scanner.h               |   5 +-
 .../org/apache/doris/planner/DataGenScanNode.java  |   5 +-
 .../doris/planner/external/MetadataScanNode.java   |  30 +--
 .../apache/doris/service/FrontendServiceImpl.java  | 211 +-----------------
 .../tablefunction/DataGenTableValuedFunction.java  |   8 +
 .../tablefunction/IcebergTableValuedFunction.java  |  57 +++--
 .../doris/tablefunction/MetadataGenerator.java     | 248 +++++++++++++++++++++
 .../tablefunction/MetadataTableValuedFunction.java |  18 +-
 .../tablefunction/NumbersTableValuedFunction.java  |   9 -
 gensrc/thrift/FrontendService.thrift               |  10 +-
 gensrc/thrift/PlanNodes.thrift                     |  15 +-
 gensrc/thrift/Types.thrift                         |   8 +
 13 files changed, 368 insertions(+), 318 deletions(-)

diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp 
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 86e60edf11..50877d5558 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -31,7 +31,6 @@ VMetaScanner::VMetaScanner(RuntimeState* state, 
VMetaScanNode* parent, int64_t t
                            const TScanRangeParams& scan_range, int64_t limit,
                            RuntimeProfile* profile)
         : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
-          _parent(parent),
           _meta_eos(false),
           _tuple_id(tuple_id),
           _scan_range(scan_range.scan_range) {}
@@ -46,11 +45,7 @@ Status VMetaScanner::prepare(RuntimeState* state, 
VExprContext** vconjunct_ctx_p
     VLOG_CRITICAL << "VMetaScanner::prepare";
     RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
-    if (_scan_range.meta_scan_range.__isset.iceberg_params) {
-        RETURN_IF_ERROR(_fetch_iceberg_metadata_batch());
-    } else {
-        _meta_eos = true;
-    }
+    RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
     return Status::OK();
 }
 
@@ -158,37 +153,28 @@ Status VMetaScanner::_fill_block_with_remote_data(const 
std::vector<MutableColum
     return Status::OK();
 }
 
-Status VMetaScanner::_fetch_iceberg_metadata_batch() {
-    VLOG_CRITICAL << "VMetaScanner::_fetch_iceberg_metadata_batch";
+Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
+    VLOG_CRITICAL << "VMetaScanner::_fetch_metadata";
     TFetchSchemaTableDataRequest request;
-    request.cluster_name = "";
-    request.__isset.cluster_name = true;
-    request.schema_table_name = TSchemaTableName::ICEBERG_TABLE_META;
-    request.__isset.schema_table_name = true;
-    auto scan_params = _parent->scan_params();
-    TMetadataTableRequestParams meta_table_params = 
TMetadataTableRequestParams();
-    meta_table_params.catalog = scan_params.catalog;
-    meta_table_params.__isset.catalog = true;
-    meta_table_params.database = scan_params.database;
-    meta_table_params.__isset.database = true;
-    meta_table_params.table = scan_params.table;
-    meta_table_params.__isset.table = true;
-
-    meta_table_params.iceberg_metadata_params = 
_scan_range.meta_scan_range.iceberg_params;
-    meta_table_params.__isset.iceberg_metadata_params = true;
-
-    request.metada_table_params = meta_table_params;
-    request.__isset.metada_table_params = true;
+    switch (meta_scan_range.metadata_type) {
+    case TMetadataType::ICEBERG:
+        RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range, 
&request));
+        break;
+    default:
+        _meta_eos = true;
+        return Status::OK();
+    }
 
+    // _state->query_timeout() is seconds, change to milliseconds
+    int time_out = _state->query_timeout() * 1000;
     TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
     TFetchSchemaTableDataResult result;
-
     RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
             master_addr.hostname, master_addr.port,
             [&request, &result](FrontendServiceConnection& client) {
                 client->fetchSchemaTableData(result, request);
             },
-            config::txn_commit_rpc_timeout_ms));
+            time_out));
 
     Status status(result.status);
     if (!status.ok()) {
@@ -199,6 +185,26 @@ Status VMetaScanner::_fetch_iceberg_metadata_batch() {
     return Status::OK();
 }
 
+Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& 
meta_scan_range,
+                                                     
TFetchSchemaTableDataRequest* request) {
+    VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request";
+    if (!meta_scan_range.__isset.iceberg_params) {
+        return Status::InternalError("Can not find TIcebergMetadataParams from 
meta_scan_range.");
+    }
+
+    // create request
+    request->__set_cluster_name("");
+    request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
+
+    // create TMetadataTableRequestParams
+    TMetadataTableRequestParams metadata_table_params;
+    metadata_table_params.__set_metadata_type(TMetadataType::ICEBERG);
+    
metadata_table_params.__set_iceberg_metadata_params(meta_scan_range.iceberg_params);
+
+    request->__set_metada_table_params(metadata_table_params);
+    return Status::OK();
+}
+
 Status VMetaScanner::close(RuntimeState* state) {
     VLOG_CRITICAL << "VMetaScanner::close";
     RETURN_IF_ERROR(VScanner::close(state));
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h 
b/be/src/vec/exec/scan/vmeta_scanner.h
index 63cd50fe10..ffb6ff8d6c 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -35,10 +35,11 @@ public:
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
     Status _fill_block_with_remote_data(const std::vector<MutableColumnPtr>& 
columns);
-    Status _fetch_iceberg_metadata_batch();
+    Status _fetch_metadata(const TMetaScanRange& meta_scan_range);
+    Status _build_iceberg_metadata_request(const TMetaScanRange& 
meta_scan_range,
+                                           TFetchSchemaTableDataRequest* 
request);
 
 private:
-    VMetaScanNode* _parent;
     bool _meta_eos;
     TupleId _tuple_id;
     const TupleDescriptor* _tuple_desc;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
index 94c5c07028..cee9007b27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
@@ -48,9 +48,8 @@ public class DataGenScanNode extends ScanNode {
     private DataGenTableValuedFunction tvf;
     private boolean isFinalized = false;
 
-    public DataGenScanNode(PlanNodeId id, TupleDescriptor desc,
-                                       String planNodeName, 
DataGenTableValuedFunction tvf) {
-        super(id, desc, planNodeName, 
StatisticalType.TABLE_VALUED_FUNCTION_NODE);
+    public DataGenScanNode(PlanNodeId id, TupleDescriptor desc, 
DataGenTableValuedFunction tvf) {
+        super(id, desc, "DataGenScanNode", 
StatisticalType.TABLE_VALUED_FUNCTION_NODE);
         this.tvf = tvf;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
index aa9c840197..bcb34b93ec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
@@ -24,12 +24,8 @@ import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
-import org.apache.doris.tablefunction.IcebergTableValuedFunction;
 import org.apache.doris.tablefunction.MetadataTableValuedFunction;
-import org.apache.doris.thrift.TIcebergMetadataParams;
-import org.apache.doris.thrift.TIcebergMetadataType;
 import org.apache.doris.thrift.TMetaScanNode;
-import org.apache.doris.thrift.TMetaScanRange;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
@@ -64,10 +60,8 @@ public class MetadataScanNode extends ScanNode {
     protected void toThrift(TPlanNode planNode) {
         planNode.setNodeType(TPlanNodeType.META_SCAN_NODE);
         TMetaScanNode metaScanNode = new TMetaScanNode();
-        metaScanNode.setCatalog(tvf.getMetadataTableName().getCtl());
-        metaScanNode.setDatabase(tvf.getMetadataTableName().getDb());
-        metaScanNode.setTable(tvf.getMetadataTableName().getTbl());
         metaScanNode.setTupleId(desc.getId().asInt());
+        metaScanNode.setMetadataType(this.tvf.getMetadataType());
         planNode.setMetaScanNode(metaScanNode);
     }
 
@@ -83,28 +77,18 @@ public class MetadataScanNode extends ScanNode {
 
     @Override
     public boolean needToCheckColumnPriv() {
-        return super.needToCheckColumnPriv();
+        return false;
     }
 
     private void buildScanRanges() {
-        if (tvf.getMetaType() == MetadataTableValuedFunction.MetaType.ICEBERG) 
{
-            IcebergTableValuedFunction icebergTvf = 
(IcebergTableValuedFunction) tvf;
-            // todo: split
-            TScanRangeLocations locations = 
createIcebergTvfLocations(icebergTvf);
-            scanRangeLocations.add(locations);
-        }
+        // todo: split
+        TScanRangeLocations locations = createMetaDataTvfLocations();
+        scanRangeLocations.add(locations);
     }
 
-    private TScanRangeLocations 
createIcebergTvfLocations(IcebergTableValuedFunction icebergTvf) {
+    private TScanRangeLocations createMetaDataTvfLocations() {
         TScanRange scanRange = new TScanRange();
-        TMetaScanRange metaScanRange = new TMetaScanRange();
-        // set iceberg metadata params
-        TIcebergMetadataParams icebergMetadataParams = new 
TIcebergMetadataParams();
-        int metadataType = icebergTvf.getMetaQueryType().ordinal();
-        
icebergMetadataParams.setMetadataType(TIcebergMetadataType.findByValue(metadataType));
-
-        metaScanRange.setIcebergParams(icebergMetadataParams);
-        scanRange.setMetaScanRange(metaScanRange);
+        scanRange.setMetaScanRange(tvf.getMetaScanRange());
         // set location
         TScanRangeLocation location = new TScanRangeLocation();
         Backend backend = backendPolicy.getNextBe();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 911f2b5f15..58c2eb479e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.service;
 
-import org.apache.doris.alter.DecommissionType;
 import org.apache.doris.alter.SchemaChangeHandler;
 import org.apache.doris.analysis.AddColumnsClause;
 import org.apache.doris.analysis.ColumnDef;
@@ -29,7 +28,6 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.HMSResource;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Replica;
@@ -39,7 +37,6 @@ import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.catalog.external.ExternalDatabase;
-import org.apache.doris.cluster.Cluster;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.AuthenticationException;
@@ -56,11 +53,9 @@ import org.apache.doris.common.ThriftServerEventProcessor;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.annotation.LogException;
-import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.cooldown.CooldownDelete;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.master.MasterImpl;
 import org.apache.doris.mysql.privilege.AccessControllerManager;
@@ -71,15 +66,14 @@ import org.apache.doris.qe.ConnectProcessor;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.VariableMgr;
-import org.apache.doris.system.Backend;
 import org.apache.doris.system.Frontend;
 import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.tablefunction.MetadataGenerator;
 import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.FrontendServiceVersion;
 import org.apache.doris.thrift.TAddColumnsRequest;
 import org.apache.doris.thrift.TAddColumnsResult;
-import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TCheckAuthRequest;
 import org.apache.doris.thrift.TCheckAuthResult;
 import org.apache.doris.thrift.TColumn;
@@ -104,7 +98,6 @@ import org.apache.doris.thrift.TGetDbsParams;
 import org.apache.doris.thrift.TGetDbsResult;
 import org.apache.doris.thrift.TGetTablesParams;
 import org.apache.doris.thrift.TGetTablesResult;
-import org.apache.doris.thrift.TIcebergMetadataType;
 import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
 import org.apache.doris.thrift.TInitExternalCtlMetaResult;
 import org.apache.doris.thrift.TListPrivilegesResult;
@@ -120,7 +113,6 @@ import org.apache.doris.thrift.TLoadTxnRollbackResult;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TMasterResult;
-import org.apache.doris.thrift.TMetadataTableRequestParams;
 import org.apache.doris.thrift.TMySqlLoadAcquireTokenResult;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPrivilegeCtrl;
@@ -130,7 +122,6 @@ import org.apache.doris.thrift.TPrivilegeType;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TReportExecStatusResult;
 import org.apache.doris.thrift.TReportRequest;
-import org.apache.doris.thrift.TRow;
 import org.apache.doris.thrift.TShowVariableRequest;
 import org.apache.doris.thrift.TShowVariableResult;
 import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
@@ -150,22 +141,14 @@ import 
org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TxnCommitAttachment;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
-import org.jetbrains.annotations.NotNull;
 
 import java.time.Instant;
-import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
@@ -1352,197 +1335,13 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     public TFetchSchemaTableDataResult 
fetchSchemaTableData(TFetchSchemaTableDataRequest request) throws TException {
         switch (request.getSchemaTableName()) {
             case BACKENDS:
-                return getBackendsSchemaTable(request);
-            case ICEBERG_TABLE_META:
-                return getIcebergMetadataTable(request);
+                return MetadataGenerator.getBackendsSchemaTable(request);
+            case METADATA_TABLE:
+                return MetadataGenerator.getMetadataTable(request);
             default:
                 break;
         }
-        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
-        result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
-        return result;
-    }
-
-    private TFetchSchemaTableDataResult 
getIcebergMetadataTable(TFetchSchemaTableDataRequest request) {
-        if (!request.isSetMetadaTableParams()) {
-            return errorResult("Metadata table params is not set. ");
-        }
-        TMetadataTableRequestParams params = request.getMetadaTableParams();
-        if (!params.isSetIcebergMetadataParams()) {
-            return errorResult("Iceberg metadata params is not set. ");
-        }
-
-        HMSExternalCatalog catalog = (HMSExternalCatalog) 
Env.getCurrentEnv().getCatalogMgr()
-                .getCatalog(params.getCatalog());
-        org.apache.iceberg.Table table;
-        try {
-            table = getIcebergTable(catalog, params.getDatabase(), 
params.getTable());
-        } catch (MetaNotFoundException e) {
-            return errorResult(e.getMessage());
-        }
-        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
-        List<TRow> dataBatch = Lists.newArrayList();
-        TIcebergMetadataType metadataType = 
params.getIcebergMetadataParams().getMetadataType();
-        switch (metadataType) {
-            case SNAPSHOTS:
-                for (Snapshot snapshot : table.snapshots()) {
-                    TRow trow = new TRow();
-                    LocalDateTime committedAt = 
LocalDateTime.ofInstant(Instant.ofEpochMilli(
-                            snapshot.timestampMillis()), 
TimeUtils.getTimeZone().toZoneId());
-                    long encodedDatetime = 
convertToDateTimeV2(committedAt.getYear(), committedAt.getMonthValue(),
-                            committedAt.getDayOfMonth(), committedAt.getHour(),
-                            committedAt.getMinute(), committedAt.getSecond());
-
-                    trow.addToColumnValue(new 
TCell().setLongVal(encodedDatetime));
-                    trow.addToColumnValue(new 
TCell().setLongVal(snapshot.snapshotId()));
-                    if (snapshot.parentId() == null) {
-                        trow.addToColumnValue(new TCell().setLongVal(-1L));
-                    } else {
-                        trow.addToColumnValue(new 
TCell().setLongVal(snapshot.parentId()));
-                    }
-                    trow.addToColumnValue(new 
TCell().setStringVal(snapshot.operation()));
-                    trow.addToColumnValue(new 
TCell().setStringVal(snapshot.manifestListLocation()));
-                    dataBatch.add(trow);
-                }
-                break;
-            default:
-                return errorResult("Unsupported metadata inspect type: " + 
metadataType);
-        }
-        result.setDataBatch(dataBatch);
-        result.setStatus(new TStatus(TStatusCode.OK));
-        return result;
-    }
-
-    public static long convertToDateTimeV2(int year, int month, int day, int 
hour, int minute, int second) {
-        return (long) second << 20 | (long) minute << 26 | (long) hour << 32
-            | (long) day << 37 | (long) month << 42 | (long) year << 46;
-    }
-
-    @NotNull
-    private TFetchSchemaTableDataResult errorResult(String msg) {
-        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
-        result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
-        result.status.addToErrorMsgs(msg);
-        return result;
-    }
-
-    private org.apache.iceberg.Table getIcebergTable(HMSExternalCatalog 
catalog, String db, String tbl)
-                throws MetaNotFoundException {
-        org.apache.iceberg.hive.HiveCatalog hiveCatalog = new 
org.apache.iceberg.hive.HiveCatalog();
-        Configuration conf = new HdfsConfiguration();
-        Map<String, String> properties = 
catalog.getCatalogProperty().getHadoopProperties();
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
-            conf.set(entry.getKey(), entry.getValue());
-        }
-        hiveCatalog.setConf(conf);
-        Map<String, String> catalogProperties = new HashMap<>();
-        catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, 
catalog.getHiveMetastoreUris());
-        catalogProperties.put("uri", catalog.getHiveMetastoreUris());
-        hiveCatalog.initialize("hive", catalogProperties);
-        return hiveCatalog.loadTable(TableIdentifier.of(db, tbl));
-    }
-
-    private TFetchSchemaTableDataResult 
getBackendsSchemaTable(TFetchSchemaTableDataRequest request) {
-        final SystemInfoService clusterInfoService = 
Env.getCurrentSystemInfo();
-        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
-        List<Long> backendIds = null;
-        if (!Strings.isNullOrEmpty(request.cluster_name)) {
-            final Cluster cluster = 
Env.getCurrentEnv().getCluster(request.cluster_name);
-            // root not in any cluster
-            if (null == cluster) {
-                return result;
-            }
-            backendIds = cluster.getBackendIdList();
-        } else {
-            backendIds = clusterInfoService.getBackendIds(false);
-            if (backendIds == null) {
-                return result;
-            }
-        }
-
-        long start = System.currentTimeMillis();
-        Stopwatch watch = Stopwatch.createUnstarted();
-
-        List<TRow> dataBatch = Lists.newArrayList();
-        for (long backendId : backendIds) {
-            Backend backend = clusterInfoService.getBackend(backendId);
-            if (backend == null) {
-                continue;
-            }
-
-            watch.start();
-            Integer tabletNum = 
Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId);
-            watch.stop();
-
-            TRow trow = new TRow();
-            trow.addToColumnValue(new TCell().setLongVal(backendId));
-            trow.addToColumnValue(new 
TCell().setStringVal(backend.getOwnerClusterName()));
-            trow.addToColumnValue(new TCell().setStringVal(backend.getIp()));
-            if (Strings.isNullOrEmpty(request.cluster_name)) {
-                trow.addToColumnValue(new 
TCell().setIntVal(backend.getHeartbeatPort()));
-                trow.addToColumnValue(new 
TCell().setIntVal(backend.getBePort()));
-                trow.addToColumnValue(new 
TCell().setIntVal(backend.getHttpPort()));
-                trow.addToColumnValue(new 
TCell().setIntVal(backend.getBrpcPort()));
-            }
-            trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime())));
-            trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs())));
-            trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(backend.isAlive())));
-            if (backend.isDecommissioned() && backend.getDecommissionType() == 
DecommissionType.ClusterDecommission) {
-                trow.addToColumnValue(new TCell().setStringVal("false"));
-                trow.addToColumnValue(new TCell().setStringVal("true"));
-            } else if (backend.isDecommissioned()
-                    && backend.getDecommissionType() == 
DecommissionType.SystemDecommission) {
-                trow.addToColumnValue(new TCell().setStringVal("true"));
-                trow.addToColumnValue(new TCell().setStringVal("false"));
-            } else {
-                trow.addToColumnValue(new TCell().setStringVal("false"));
-                trow.addToColumnValue(new TCell().setStringVal("false"));
-            }
-            trow.addToColumnValue(new TCell().setLongVal(tabletNum));
-
-            // capacity
-            // data used
-            trow.addToColumnValue(new 
TCell().setLongVal(backend.getDataUsedCapacityB()));
-
-            // available
-            long availB = backend.getAvailableCapacityB();
-            trow.addToColumnValue(new TCell().setLongVal(availB));
-
-            // total
-            long totalB = backend.getTotalCapacityB();
-            trow.addToColumnValue(new TCell().setLongVal(totalB));
-
-            // used percent
-            double used = 0.0;
-            if (totalB <= 0) {
-                used = 0.0;
-            } else {
-                used = (double) (totalB - availB) * 100 / totalB;
-            }
-            trow.addToColumnValue(new TCell().setDoubleVal(used));
-            trow.addToColumnValue(new 
TCell().setDoubleVal(backend.getMaxDiskUsedPct() * 100));
-
-            // remote used capacity
-            trow.addToColumnValue(new 
TCell().setLongVal(backend.getRemoteUsedCapacityB()));
-
-            // tags
-            trow.addToColumnValue(new 
TCell().setStringVal(backend.getTagMapString()));
-            // err msg
-            trow.addToColumnValue(new 
TCell().setStringVal(backend.getHeartbeatErrMsg()));
-            // version
-            trow.addToColumnValue(new 
TCell().setStringVal(backend.getVersion()));
-            // status
-            trow.addToColumnValue(new TCell().setStringVal(new 
Gson().toJson(backend.getBackendStatus())));
-            dataBatch.add(trow);
-        }
-
-        // backends proc node get result too slow, add log to observer.
-        LOG.debug("backends proc get tablet num cost: {}, total cost: {}",
-                watch.elapsed(TimeUnit.MILLISECONDS), 
(System.currentTimeMillis() - start));
-
-        result.setDataBatch(dataBatch);
-        result.setStatus(new TStatus(TStatusCode.OK));
-        return result;
+        return MetadataGenerator.errorResult("Fetch schema table name is not 
set");
     }
 
     private TNetworkAddress getClientAddr() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
index ab3579e9d8..fc2a6d6dd3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
@@ -17,7 +17,11 @@
 
 package org.apache.doris.tablefunction;
 
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.planner.DataGenScanNode;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
 import org.apache.doris.thrift.TDataGenFunctionName;
 
 import java.util.List;
@@ -27,4 +31,8 @@ public abstract class DataGenTableValuedFunction extends 
TableValuedFunctionIf {
 
     public abstract TDataGenFunctionName getDataGenFunctionName();
 
+    @Override
+    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+        return new DataGenScanNode(id, desc, this);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
index 9b58119ddd..38d11496e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
@@ -26,6 +26,10 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TIcebergMetadataParams;
+import org.apache.doris.thrift.TIcebergQueryType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
@@ -40,8 +44,6 @@ import java.util.Map;
  */
 public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
 
-    public enum MetadataType { SNAPSHOTS }
-
     public static final String NAME = "iceberg_meta";
     private static final String TABLE = "table";
     private static final String QUERY_TYPE = "query_type";
@@ -51,54 +53,71 @@ public class IcebergTableValuedFunction extends 
MetadataTableValuedFunction {
             .add(QUERY_TYPE)
             .build();
 
-    private final MetadataType queryType;
-    private final TableName tableName;
+    private TIcebergQueryType queryType;
+
+    // here tableName represents the name of a table in Iceberg.
+    private final TableName icebergTableName;
 
     public IcebergTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
-        super(MetaType.ICEBERG);
         Map<String, String> validParams = Maps.newHashMap();
         for (String key : params.keySet()) {
             if (!PROPERTIES_SET.contains(key.toLowerCase())) {
                 throw new AnalysisException("'" + key + "' is invalid 
property");
             }
-            // check ctl db tbl
+            // check ctl, db, tbl
             validParams.put(key.toLowerCase(), params.get(key));
         }
         String tableName = validParams.get(TABLE);
-        String queryType = validParams.get(QUERY_TYPE);
-        if (tableName == null || queryType == null) {
+        String queryTypeString = validParams.get(QUERY_TYPE);
+        if (tableName == null || queryTypeString == null) {
             throw new AnalysisException("Invalid iceberg metadata query");
         }
         String[] names = tableName.split("\\.");
         if (names.length != 3) {
             throw new AnalysisException("The iceberg table name contains the 
catalogName, databaseName, and tableName");
         }
-        this.tableName = new TableName(names[0], names[1], names[2]);
+        this.icebergTableName = new TableName(names[0], names[1], names[2]);
         // check auth
         if (!Env.getCurrentEnv().getAccessManager()
-                .checkTblPriv(ConnectContext.get(), this.tableName, 
PrivPredicate.SELECT)) {
+                .checkTblPriv(ConnectContext.get(), this.icebergTableName, 
PrivPredicate.SELECT)) {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"SELECT",
                     ConnectContext.get().getQualifiedUser(), 
ConnectContext.get().getRemoteIP(),
-                    this.tableName.getDb() + ": " + this.tableName.getTbl());
+                    this.icebergTableName.getDb() + ": " + 
this.icebergTableName.getTbl());
         }
         try {
-            this.queryType = MetadataType.valueOf(queryType.toUpperCase());
+            // TODO(ftw): check here
+            this.queryType = 
TIcebergQueryType.valueOf(queryTypeString.toUpperCase());
         } catch (IllegalArgumentException e) {
             throw new AnalysisException("Unsupported iceberg metadata query 
type: " + queryType);
         }
     }
 
+    public TIcebergQueryType getIcebergQueryType() {
+        return queryType;
+    }
+
     @Override
-    public String getTableName() {
-        return "IcebergMetadataTableValuedFunction";
+    public TMetadataType getMetadataType() {
+        return TMetadataType.ICEBERG;
     }
 
-    public TableName getMetadataTableName() {
-        return tableName;
+    @Override
+    public TMetaScanRange getMetaScanRange() {
+        TMetaScanRange metaScanRange = new TMetaScanRange();
+        metaScanRange.setMetadataType(TMetadataType.ICEBERG);
+        // set iceberg metadata params
+        TIcebergMetadataParams icebergMetadataParams = new 
TIcebergMetadataParams();
+        icebergMetadataParams.setIcebergQueryType(queryType);
+        icebergMetadataParams.setCatalog(icebergTableName.getCtl());
+        icebergMetadataParams.setDatabase(icebergTableName.getDb());
+        icebergMetadataParams.setTable(icebergTableName.getTbl());
+        metaScanRange.setIcebergParams(icebergMetadataParams);
+        return metaScanRange;
     }
 
-    public MetadataType getMetaQueryType() {
-        return queryType;
+    @Override
+    public String getTableName() {
+        return "IcebergMetadataTableValuedFunction";
     }
 
     /**
@@ -110,7 +129,7 @@ public class IcebergTableValuedFunction extends 
MetadataTableValuedFunction {
     @Override
     public List<Column> getTableColumns() throws AnalysisException {
         List<Column> resColumns = new ArrayList<>();
-        if (queryType == MetadataType.SNAPSHOTS) {
+        if (queryType == TIcebergQueryType.SNAPSHOTS) {
             resColumns.add(new Column("committed_at", 
PrimitiveType.DATETIMEV2, false));
             resColumns.add(new Column("snapshot_id", PrimitiveType.BIGINT, 
false));
             resColumns.add(new Column("parent_id", PrimitiveType.BIGINT, 
false));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
new file mode 100644
index 0000000000..20a5cd3a00
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.alter.DecommissionType;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HMSResource;
+import org.apache.doris.cluster.Cluster;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
+import org.apache.doris.thrift.TFetchSchemaTableDataResult;
+import org.apache.doris.thrift.TIcebergMetadataParams;
+import org.apache.doris.thrift.TIcebergQueryType;
+import org.apache.doris.thrift.TMetadataTableRequestParams;
+import org.apache.doris.thrift.TRow;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class MetadataGenerator {
+    private static final Logger LOG = 
LogManager.getLogger(MetadataGenerator.class);
+
+    public static TFetchSchemaTableDataResult 
getMetadataTable(TFetchSchemaTableDataRequest request) {
+        if (!request.isSetMetadaTableParams()) {
+            return errorResult("Metadata table params is not set. ");
+        }
+        switch (request.getMetadaTableParams().getMetadataType()) {
+            case ICEBERG:
+                return icebergMetadataResult(request.getMetadaTableParams());
+            default:
+                break;
+        }
+        return errorResult("Metadata table params is not set. ");
+    }
+
+    public static TFetchSchemaTableDataResult 
getBackendsSchemaTable(TFetchSchemaTableDataRequest request) {
+        final SystemInfoService clusterInfoService = 
Env.getCurrentSystemInfo();
+        List<Long> backendIds = null;
+        if (!Strings.isNullOrEmpty(request.cluster_name)) {
+            final Cluster cluster = 
Env.getCurrentEnv().getCluster(request.cluster_name);
+            // root not in any cluster
+            if (null == cluster) {
+                return errorResult("Cluster is not existed.");
+            }
+            backendIds = cluster.getBackendIdList();
+        } else {
+            backendIds = clusterInfoService.getBackendIds(false);
+        }
+
+        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+        long start = System.currentTimeMillis();
+        Stopwatch watch = Stopwatch.createUnstarted();
+
+        List<TRow> dataBatch = Lists.newArrayList();
+        for (long backendId : backendIds) {
+            Backend backend = clusterInfoService.getBackend(backendId);
+            if (backend == null) {
+                continue;
+            }
+
+            watch.start();
+            Integer tabletNum = 
Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId);
+            watch.stop();
+
+            TRow trow = new TRow();
+            trow.addToColumnValue(new TCell().setLongVal(backendId));
+            trow.addToColumnValue(new 
TCell().setStringVal(backend.getOwnerClusterName()));
+            trow.addToColumnValue(new TCell().setStringVal(backend.getIp()));
+            if (Strings.isNullOrEmpty(request.cluster_name)) {
+                trow.addToColumnValue(new 
TCell().setIntVal(backend.getHeartbeatPort()));
+                trow.addToColumnValue(new 
TCell().setIntVal(backend.getBePort()));
+                trow.addToColumnValue(new 
TCell().setIntVal(backend.getHttpPort()));
+                trow.addToColumnValue(new 
TCell().setIntVal(backend.getBrpcPort()));
+            }
+            trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime())));
+            trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs())));
+            trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(backend.isAlive())));
+            if (backend.isDecommissioned() && backend.getDecommissionType() == 
DecommissionType.ClusterDecommission) {
+                trow.addToColumnValue(new TCell().setStringVal("false"));
+                trow.addToColumnValue(new TCell().setStringVal("true"));
+            } else if (backend.isDecommissioned()
+                    && backend.getDecommissionType() == 
DecommissionType.SystemDecommission) {
+                trow.addToColumnValue(new TCell().setStringVal("true"));
+                trow.addToColumnValue(new TCell().setStringVal("false"));
+            } else {
+                trow.addToColumnValue(new TCell().setStringVal("false"));
+                trow.addToColumnValue(new TCell().setStringVal("false"));
+            }
+            trow.addToColumnValue(new TCell().setLongVal(tabletNum));
+
+            // capacity
+            // data used
+            trow.addToColumnValue(new 
TCell().setLongVal(backend.getDataUsedCapacityB()));
+
+            // available
+            long availB = backend.getAvailableCapacityB();
+            trow.addToColumnValue(new TCell().setLongVal(availB));
+
+            // total
+            long totalB = backend.getTotalCapacityB();
+            trow.addToColumnValue(new TCell().setLongVal(totalB));
+
+            // used percent
+            double used = 0.0;
+            if (totalB <= 0) {
+                used = 0.0;
+            } else {
+                used = (double) (totalB - availB) * 100 / totalB;
+            }
+            trow.addToColumnValue(new TCell().setDoubleVal(used));
+            trow.addToColumnValue(new 
TCell().setDoubleVal(backend.getMaxDiskUsedPct() * 100));
+
+            // remote used capacity
+            trow.addToColumnValue(new 
TCell().setLongVal(backend.getRemoteUsedCapacityB()));
+
+            // tags
+            trow.addToColumnValue(new 
TCell().setStringVal(backend.getTagMapString()));
+            // err msg
+            trow.addToColumnValue(new 
TCell().setStringVal(backend.getHeartbeatErrMsg()));
+            // version
+            trow.addToColumnValue(new 
TCell().setStringVal(backend.getVersion()));
+            // status
+            trow.addToColumnValue(new TCell().setStringVal(new 
Gson().toJson(backend.getBackendStatus())));
+            dataBatch.add(trow);
+        }
+
+        // backends proc node get result too slow, add log to observer.
+        LOG.debug("backends proc get tablet num cost: {}, total cost: {}",
+                watch.elapsed(TimeUnit.MILLISECONDS), 
(System.currentTimeMillis() - start));
+
+        result.setDataBatch(dataBatch);
+        result.setStatus(new TStatus(TStatusCode.OK));
+        return result;
+    }
+
+    @NotNull
+    public static TFetchSchemaTableDataResult errorResult(String msg) {
+        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+        result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
+        result.status.addToErrorMsgs(msg);
+        return result;
+    }
+
+    private static TFetchSchemaTableDataResult 
icebergMetadataResult(TMetadataTableRequestParams params) {
+        if (!params.isSetIcebergMetadataParams()) {
+            return errorResult("Iceberg metadata params is not set. ");
+        }
+        TIcebergMetadataParams icebergMetadataParams =  
params.getIcebergMetadataParams();
+        HMSExternalCatalog catalog = (HMSExternalCatalog) 
Env.getCurrentEnv().getCatalogMgr()
+                .getCatalog(icebergMetadataParams.getCatalog());
+        org.apache.iceberg.Table table;
+        try {
+            table = getIcebergTable(catalog, 
icebergMetadataParams.getDatabase(), icebergMetadataParams.getTable());
+        } catch (MetaNotFoundException e) {
+            return errorResult(e.getMessage());
+        }
+        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+        List<TRow> dataBatch = Lists.newArrayList();
+        TIcebergQueryType icebergQueryType = 
icebergMetadataParams.getIcebergQueryType();
+        switch (icebergQueryType) {
+            case SNAPSHOTS:
+                for (Snapshot snapshot : table.snapshots()) {
+                    TRow trow = new TRow();
+                    LocalDateTime committedAt = 
LocalDateTime.ofInstant(Instant.ofEpochMilli(
+                            snapshot.timestampMillis()), 
TimeUtils.getTimeZone().toZoneId());
+                    long encodedDatetime = 
convertToDateTimeV2(committedAt.getYear(), committedAt.getMonthValue(),
+                            committedAt.getDayOfMonth(), committedAt.getHour(),
+                            committedAt.getMinute(), committedAt.getSecond());
+
+                    trow.addToColumnValue(new 
TCell().setLongVal(encodedDatetime));
+                    trow.addToColumnValue(new 
TCell().setLongVal(snapshot.snapshotId()));
+                    if (snapshot.parentId() == null) {
+                        trow.addToColumnValue(new TCell().setLongVal(-1L));
+                    } else {
+                        trow.addToColumnValue(new 
TCell().setLongVal(snapshot.parentId()));
+                    }
+                    trow.addToColumnValue(new 
TCell().setStringVal(snapshot.operation()));
+                    trow.addToColumnValue(new 
TCell().setStringVal(snapshot.manifestListLocation()));
+                    dataBatch.add(trow);
+                }
+                break;
+            default:
+                return errorResult("Unsupported iceberg inspect type: " + 
icebergQueryType);
+        }
+        result.setDataBatch(dataBatch);
+        result.setStatus(new TStatus(TStatusCode.OK));
+        return result;
+    }
+
+    private static org.apache.iceberg.Table getIcebergTable(HMSExternalCatalog 
catalog, String db, String tbl)
+            throws MetaNotFoundException {
+        org.apache.iceberg.hive.HiveCatalog hiveCatalog = new 
org.apache.iceberg.hive.HiveCatalog();
+        Configuration conf = new HdfsConfiguration();
+        Map<String, String> properties = 
catalog.getCatalogProperty().getHadoopProperties();
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            conf.set(entry.getKey(), entry.getValue());
+        }
+        hiveCatalog.setConf(conf);
+        Map<String, String> catalogProperties = new HashMap<>();
+        catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, 
catalog.getHiveMetastoreUris());
+        catalogProperties.put("uri", catalog.getHiveMetastoreUris());
+        hiveCatalog.initialize("hive", catalogProperties);
+        return hiveCatalog.loadTable(TableIdentifier.of(db, tbl));
+    }
+
+    private static long convertToDateTimeV2(int year, int month, int day, int 
hour, int minute, int second) {
+        return (long) second << 20 | (long) minute << 26 | (long) hour << 32
+                | (long) day << 37 | (long) month << 42 | (long) year << 46;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index fd83c59957..0b30ea18c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -17,27 +17,17 @@
 
 package org.apache.doris.tablefunction;
 
-import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.external.MetadataScanNode;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
 
 public abstract class MetadataTableValuedFunction extends 
TableValuedFunctionIf {
+    public abstract TMetadataType getMetadataType();
 
-    public enum MetaType { ICEBERG }
-
-    private final MetaType metaType;
-
-    public MetadataTableValuedFunction(MetaType metaType) {
-        this.metaType = metaType;
-    }
-
-    public MetaType getMetaType() {
-        return metaType;
-    }
-
-    public abstract TableName getMetadataTableName();
+    public abstract TMetaScanRange getMetaScanRange();
 
     @Override
     public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
index 3e0e92b5b5..639dfeef35 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
@@ -17,14 +17,10 @@
 
 package org.apache.doris.tablefunction;
 
-import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.planner.DataGenScanNode;
-import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.planner.ScanNode;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TDataGenFunctionName;
 import org.apache.doris.thrift.TDataGenScanRange;
@@ -141,9 +137,4 @@ public class NumbersTableValuedFunction extends 
DataGenTableValuedFunction {
         }
         return res;
     }
-
-    @Override
-    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
-        return new DataGenScanNode(id, desc, "DataGenScanNode", this);
-    }
 }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 54b12fd8ac..1336d5e81d 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -715,16 +715,14 @@ struct TInitExternalCtlMetaResult {
     2: optional string status;
 }
 
-enum TSchemaTableName{
+enum TSchemaTableName {
   BACKENDS = 0,
-  ICEBERG_TABLE_META = 1,
+  METADATA_TABLE = 1,
 }
 
 struct TMetadataTableRequestParams {
-  1: optional PlanNodes.TIcebergMetadataParams iceberg_metadata_params
-  2: optional string catalog
-  3: optional string database
-  4: optional string table
+  1: optional Types.TMetadataType metadata_type
+  2: optional PlanNodes.TIcebergMetadataParams iceberg_metadata_params
 }
 
 struct TFetchSchemaTableDataRequest {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 978c90e103..e80372eb69 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -377,16 +377,17 @@ struct TDataGenScanRange {
   1: optional TTVFNumbersScanRange numbers_params
 }
 
-enum TIcebergMetadataType {
-  SNAPSHOTS = 0,
-}
 
 struct TIcebergMetadataParams {
-  1: optional TIcebergMetadataType metadata_type
+  1: optional Types.TIcebergQueryType iceberg_query_type
+  2: optional string catalog
+  3: optional string database
+  4: optional string table
 }
 
 struct TMetaScanRange {
-  1: optional TIcebergMetadataParams iceberg_params
+  1: optional Types.TMetadataType metadata_type
+  2: optional TIcebergMetadataParams iceberg_params
 }
 
 // Specification of an individual data range which is held in its entirety
@@ -532,9 +533,7 @@ struct TSchemaScanNode {
 
 struct TMetaScanNode {
   1: required Types.TTupleId tuple_id
-  2: optional string catalog
-  3: optional string database
-  4: optional string table
+  2: optional Types.TMetadataType metadata_type
 }
 
 struct TTestExternalScanNode {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index aa1e00de98..e81f7fd002 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -657,6 +657,14 @@ enum TSortType {
     ZORDER, 
 }
 
+enum TMetadataType {
+  ICEBERG
+}
+
+enum TIcebergQueryType {
+  SNAPSHOTS
+}
+
 // represent a user identity
 struct TUserIdentity {
     1: optional string username


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to