This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 40d9e19e1d [feature-wip](multi-catalog) support iceberg union catalog, 
and add h… (#16082)
40d9e19e1d is described below

commit 40d9e19e1d1efe12a0853c99177d8e672c9993c4
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Wed Feb 1 22:59:42 2023 +0800

    [feature-wip](multi-catalog) support iceberg union catalog, and add h… 
(#16082)
    
    support iceberg unified catalog framework, and add hms and rest catalog for 
the framework
---
 .../java/org/apache/doris/catalog/TableIf.java     |   6 +-
 .../catalog/external/IcebergExternalDatabase.java  | 180 ++++++++++++++++++
 .../catalog/external/IcebergExternalTable.java     |  66 +++++++
 .../apache/doris/datasource/CatalogFactory.java    |   4 +
 .../apache/doris/datasource/ExternalCatalog.java   |   9 +
 .../apache/doris/datasource/InitCatalogLog.java    |   3 +-
 .../DataLakeAWSCredentialsProvider.java            |  59 ++++++
 .../datasource/iceberg/IcebergExternalCatalog.java | 210 +++++++++++++++++++++
 .../iceberg/IcebergExternalCatalogFactory.java     |  43 +++++
 .../iceberg/IcebergHMSExternalCatalog.java         |  50 +++++
 .../iceberg/IcebergRestExternalCatalog.java        |  53 ++++++
 .../apache/doris/planner/SingleNodePlanner.java    |   1 +
 .../planner/external/ExternalFileScanNode.java     |  38 +++-
 .../doris/planner/external/QueryScanProvider.java  |   2 +
 .../planner/external/iceberg/IcebergApiSource.java | 119 ++++++++++++
 .../{ => iceberg}/IcebergDeleteFileFilter.java     |  11 +-
 .../planner/external/iceberg/IcebergHMSSource.java |  85 +++++++++
 .../{ => iceberg}/IcebergScanProvider.java         | 101 +++++++---
 .../planner/external/iceberg/IcebergSource.java    |  44 +++++
 .../external/{ => iceberg}/IcebergSplit.java       |   3 +-
 20 files changed, 1054 insertions(+), 33 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index df013b7cc7..7a71d4f4e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -134,7 +134,8 @@ public interface TableIf {
      */
     enum TableType {
         MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, 
HIVE, ICEBERG, HUDI, JDBC,
-        TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, 
MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE;
+        TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, 
MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
+        ICEBERG_EXTERNAL_TABLE;
 
         public String toEngineName() {
             switch (this) {
@@ -167,6 +168,8 @@ public interface TableIf {
                     return "hms";
                 case ES_EXTERNAL_TABLE:
                     return "es";
+                case ICEBERG_EXTERNAL_TABLE:
+                    return "iceberg";
                 default:
                     return null;
             }
@@ -193,6 +196,7 @@ public interface TableIf {
                 case TABLE_VALUED_FUNCTION:
                 case HMS_EXTERNAL_TABLE:
                 case ES_EXTERNAL_TABLE:
+                case ICEBERG_EXTERNAL_TABLE:
                     return "EXTERNAL TABLE";
                 default:
                     return null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
new file mode 100644
index 0000000000..9b110c9dcf
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitDatabaseLog;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class IcebergExternalDatabase extends 
ExternalDatabase<IcebergExternalTable> implements GsonPostProcessable {
+
+    private static final Logger LOG = 
LogManager.getLogger(IcebergExternalDatabase.class);
+    // Cache of table name to table id.
+    private Map<String, Long> tableNameToId = Maps.newConcurrentMap();
+    @SerializedName(value = "idToTbl")
+    private Map<Long, IcebergExternalTable> idToTbl = Maps.newConcurrentMap();
+
+    public IcebergExternalDatabase(ExternalCatalog extCatalog, Long id, String 
name) {
+        super(extCatalog, id, name);
+    }
+
+    public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {
+        Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
+        Map<Long, IcebergExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
+        for (int i = 0; i < log.getRefreshCount(); i++) {
+            IcebergExternalTable table = 
getTableForReplay(log.getRefreshTableIds().get(i));
+            tmpTableNameToId.put(table.getName(), table.getId());
+            tmpIdToTbl.put(table.getId(), table);
+        }
+        for (int i = 0; i < log.getCreateCount(); i++) {
+            IcebergExternalTable table = new 
IcebergExternalTable(log.getCreateTableIds().get(i),
+                    log.getCreateTableNames().get(i), name, 
(IcebergExternalCatalog) catalog);
+            tmpTableNameToId.put(table.getName(), table.getId());
+            tmpIdToTbl.put(table.getId(), table);
+        }
+        tableNameToId = tmpTableNameToId;
+        idToTbl = tmpIdToTbl;
+        initialized = true;
+    }
+
+    public void setTableExtCatalog(ExternalCatalog extCatalog) {
+        for (IcebergExternalTable table : idToTbl.values()) {
+            table.setCatalog(extCatalog);
+        }
+    }
+
+    @Override
+    protected void init() {
+        InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
+        initDatabaseLog.setType(InitDatabaseLog.Type.HMS);
+        initDatabaseLog.setCatalogId(extCatalog.getId());
+        initDatabaseLog.setDbId(id);
+        List<String> tableNames = extCatalog.listTableNames(null, name);
+        if (tableNames != null) {
+            Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap();
+            Map<Long, IcebergExternalTable> tmpIdToTbl = Maps.newHashMap();
+            for (String tableName : tableNames) {
+                long tblId;
+                if (tableNameToId != null && 
tableNameToId.containsKey(tableName)) {
+                    tblId = tableNameToId.get(tableName);
+                    tmpTableNameToId.put(tableName, tblId);
+                    IcebergExternalTable table = idToTbl.get(tblId);
+                    tmpIdToTbl.put(tblId, table);
+                    initDatabaseLog.addRefreshTable(tblId);
+                } else {
+                    tblId = Env.getCurrentEnv().getNextId();
+                    tmpTableNameToId.put(tableName, tblId);
+                    IcebergExternalTable table = new 
IcebergExternalTable(tblId, tableName, name,
+                            (IcebergExternalCatalog) extCatalog);
+                    tmpIdToTbl.put(tblId, table);
+                    initDatabaseLog.addCreateTable(tblId, tableName);
+                }
+            }
+            tableNameToId = tmpTableNameToId;
+            idToTbl = tmpIdToTbl;
+        }
+        initialized = true;
+        Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog);
+    }
+
+    @Override
+    public List<IcebergExternalTable> getTables() {
+        makeSureInitialized();
+        return Lists.newArrayList(idToTbl.values());
+    }
+
+    @Override
+    public List<IcebergExternalTable> getTablesOnIdOrder() {
+        // Sort the name instead, because the id may change.
+        return 
getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList());
+    }
+
+    @Override
+    public Set<String> getTableNamesWithLock() {
+        makeSureInitialized();
+        return Sets.newHashSet(tableNameToId.keySet());
+    }
+
+    @Override
+    public IcebergExternalTable getTableNullable(String tableName) {
+        makeSureInitialized();
+        if (!tableNameToId.containsKey(tableName)) {
+            return null;
+        }
+        return idToTbl.get(tableNameToId.get(tableName));
+    }
+
+    @Override
+    public IcebergExternalTable getTableNullable(long tableId) {
+        makeSureInitialized();
+        return idToTbl.get(tableId);
+    }
+
+    @Override
+    public IcebergExternalTable getTableForReplay(long tableId) {
+        return idToTbl.get(tableId);
+    }
+
+    @Override
+    public void gsonPostProcess() throws IOException {
+        tableNameToId = Maps.newConcurrentMap();
+        for (IcebergExternalTable tbl : idToTbl.values()) {
+            tableNameToId.put(tbl.getName(), tbl.getId());
+        }
+        rwLock = new ReentrantReadWriteLock(true);
+    }
+
+    @Override
+    public void dropTable(String tableName) {
+        LOG.debug("drop table [{}]", tableName);
+        makeSureInitialized();
+        Long tableId = tableNameToId.remove(tableName);
+        if (tableId == null) {
+            LOG.warn("drop table [{}] failed", tableName);
+        }
+        idToTbl.remove(tableId);
+    }
+
+    @Override
+    public void createTable(String tableName, long tableId) {
+        LOG.debug("create table [{}]", tableName);
+        makeSureInitialized();
+        tableNameToId.put(tableName, tableId);
+        IcebergExternalTable table = new IcebergExternalTable(tableId, 
tableName, name,
+                (IcebergExternalCatalog) extCatalog);
+        idToTbl.put(tableId, table);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
new file mode 100644
index 0000000000..389ba8dc55
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.external;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.thrift.THiveTable;
+import org.apache.doris.thrift.TIcebergTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class IcebergExternalTable extends ExternalTable {
+
+    IcebergExternalCatalog icebergCatalog;
+
+    public IcebergExternalTable(long id, String name, String dbName, 
IcebergExternalCatalog catalog) {
+        super(id, name, catalog, dbName, TableType.ICEBERG_EXTERNAL_TABLE);
+        icebergCatalog = catalog;
+    }
+
+    public String getIcebergCatalogType() {
+        return icebergCatalog.getIcebergCatalogType();
+    }
+
+    protected synchronized void makeSureInitialized() {
+        if (!objectCreated) {
+            objectCreated = true;
+        }
+    }
+
+    @Override
+    public TTableDescriptor toThrift() {
+        List<Column> schema = getFullSchema();
+        if (icebergCatalog.getIcebergCatalogType().equals("hms")) {
+            THiveTable tHiveTable = new THiveTable(dbName, name, new 
HashMap<>());
+            TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.HIVE_TABLE, schema.size(), 0,
+                    getName(), dbName);
+            tTableDescriptor.setHiveTable(tHiveTable);
+            return tTableDescriptor;
+        } else {
+            TIcebergTable icebergTable = new TIcebergTable(dbName, name, new 
HashMap<>());
+            TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.ICEBERG_TABLE,
+                    schema.size(), 0, getName(), dbName);
+            tTableDescriptor.setIcebergTable(icebergTable);
+            return tTableDescriptor;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index 87c1212c1c..3f9be036ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Resource;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory;
 
 import org.apache.parquet.Strings;
 
@@ -102,6 +103,9 @@ public class CatalogFactory {
             case "jdbc":
                 catalog = new JdbcExternalCatalog(catalogId, name, resource, 
props);
                 break;
+            case "iceberg":
+                catalog = 
IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props);
+                break;
             default:
                 throw new DdlException("Unknown catalog type: " + catalogType);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 3a4a711e1c..200999c6db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.external.EsExternalDatabase;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.catalog.external.HMSExternalDatabase;
+import org.apache.doris.catalog.external.IcebergExternalDatabase;
 import org.apache.doris.catalog.external.JdbcExternalDatabase;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.io.Text;
@@ -280,6 +281,14 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
                     tmpIdToDb.put(db.getId(), db);
                 }
                 break;
+            case ICEBERG:
+                for (int i = 0; i < log.getCreateCount(); i++) {
+                    IcebergExternalDatabase db = new IcebergExternalDatabase(
+                            this, log.getCreateDbIds().get(i), 
log.getCreateDbNames().get(i));
+                    tmpDbNameToId.put(db.getFullName(), db.getId());
+                    tmpIdToDb.put(db.getId(), db);
+                }
+                break;
             default:
                 break;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
index c7ed1696b2..524e2d9d3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
@@ -32,10 +32,11 @@ import java.util.List;
 
 @Data
 public class InitCatalogLog implements Writable {
-    enum Type {
+    public enum Type {
         HMS,
         ES,
         JDBC,
+        ICEBERG,
         UNKNOWN;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/DataLakeAWSCredentialsProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/DataLakeAWSCredentialsProvider.java
new file mode 100644
index 0000000000..9901b9c668
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/DataLakeAWSCredentialsProvider.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.credentials;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.BasicSessionCredentials;
+import com.amazonaws.util.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.Constants;
+
+public class DataLakeAWSCredentialsProvider implements AWSCredentialsProvider  
{
+
+    private final Configuration conf;
+
+    public DataLakeAWSCredentialsProvider(Configuration conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    public AWSCredentials getCredentials() {
+        String accessKey = StringUtils.trim(conf.get(Constants.ACCESS_KEY));
+        String secretKey = StringUtils.trim(conf.get(Constants.SECRET_KEY));
+        String sessionToken = 
StringUtils.trim(conf.get(Constants.SESSION_TOKEN));
+        if (!StringUtils.isNullOrEmpty(accessKey) && 
!StringUtils.isNullOrEmpty(secretKey)) {
+            return (StringUtils.isNullOrEmpty(sessionToken) ? new 
BasicAWSCredentials(accessKey,
+                secretKey) : new BasicSessionCredentials(accessKey, secretKey, 
sessionToken));
+        } else {
+            throw new SdkClientException(
+                "Unable to load AWS credentials from hive conf 
(fs.s3a.access.key and fs.s3a.secret.key)");
+        }
+    }
+
+    @Override
+    public void refresh() {
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
new file mode 100644
index 0000000000..53e5b57459
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.external.ExternalDatabase;
+import org.apache.doris.catalog.external.IcebergExternalDatabase;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.SessionContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class IcebergExternalCatalog extends ExternalCatalog {
+
+    private static final Logger LOG = 
LogManager.getLogger(IcebergExternalCatalog.class);
+    public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type";
+    public static final String ICEBERG_REST = "rest";
+    public static final String ICEBERG_HMS = "hms";
+    protected final String icebergCatalogType;
+    protected Catalog catalog;
+    protected SupportsNamespaces nsCatalog;
+
+    public IcebergExternalCatalog(long catalogId, String name, String type) {
+        super(catalogId, name);
+        this.icebergCatalogType = type;
+    }
+
+    @Override
+    protected void init() {
+        nsCatalog = (SupportsNamespaces) catalog;
+        Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
+        Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
+        InitCatalogLog initCatalogLog = new InitCatalogLog();
+        initCatalogLog.setCatalogId(id);
+        initCatalogLog.setType(InitCatalogLog.Type.ICEBERG);
+        List<String> allDatabaseNames = listDatabaseNames();
+        for (String dbName : allDatabaseNames) {
+            long dbId;
+            if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
+                dbId = dbNameToId.get(dbName);
+                tmpDbNameToId.put(dbName, dbId);
+                ExternalDatabase db = idToDb.get(dbId);
+                db.setUnInitialized(invalidCacheInInit);
+                tmpIdToDb.put(dbId, db);
+                initCatalogLog.addRefreshDb(dbId);
+            } else {
+                dbId = Env.getCurrentEnv().getNextId();
+                tmpDbNameToId.put(dbName, dbId);
+                IcebergExternalDatabase db = new IcebergExternalDatabase(this, 
dbId, dbName);
+                tmpIdToDb.put(dbId, db);
+                initCatalogLog.addCreateDb(dbId, dbName);
+            }
+        }
+        dbNameToId = tmpDbNameToId;
+        idToDb = tmpIdToDb;
+        Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
+    }
+
+    protected Configuration getConfiguration() {
+        Configuration conf = new HdfsConfiguration();
+        Map<String, String> catalogProperties = 
catalogProperty.getHadoopProperties();
+        for (Map.Entry<String, String> entry : catalogProperties.entrySet()) {
+            conf.set(entry.getKey(), entry.getValue());
+        }
+        return conf;
+    }
+
+    protected Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
+        if (type.isPrimitiveType()) {
+            return 
icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) 
type);
+        }
+        switch (type.typeId()) {
+            case LIST:
+                Types.ListType list = (Types.ListType) type;
+                return 
ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
+            case MAP:
+            case STRUCT:
+                return Type.UNSUPPORTED;
+            default:
+                throw new IllegalArgumentException("Cannot transform unknown 
type: " + type);
+        }
+    }
+
+    private Type 
icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType 
primitive) {
+        switch (primitive.typeId()) {
+            case BOOLEAN:
+                return Type.BOOLEAN;
+            case INTEGER:
+                return Type.INT;
+            case LONG:
+                return Type.BIGINT;
+            case FLOAT:
+                return Type.FLOAT;
+            case DOUBLE:
+                return Type.DOUBLE;
+            case STRING:
+            case BINARY:
+            case UUID:
+                return Type.STRING;
+            case FIXED:
+                Types.FixedType fixed = (Types.FixedType) primitive;
+                return ScalarType.createCharType(fixed.length());
+            case DECIMAL:
+                Types.DecimalType decimal = (Types.DecimalType) primitive;
+                return ScalarType.createDecimalType(decimal.precision(), 
decimal.scale());
+            case DATE:
+                return ScalarType.createDateV2Type();
+            case TIMESTAMP:
+                return ScalarType.createDatetimeV2Type(0);
+            case TIME:
+                return Type.UNSUPPORTED;
+            default:
+                throw new IllegalArgumentException("Cannot transform unknown 
type: " + primitive);
+        }
+    }
+
+    public String getIcebergCatalogType() {
+        return icebergCatalogType;
+    }
+
+    protected List<String> listDatabaseNames() {
+        return nsCatalog.listNamespaces().stream()
+            .map(e -> {
+                String dbName = e.toString();
+                try {
+                    FeNameFormat.checkDbName(dbName);
+                } catch (AnalysisException ex) {
+                    Util.logAndThrowRuntimeException(LOG,
+                            String.format("Not a supported namespace name 
format: %s", dbName), ex);
+                }
+                return dbName;
+            })
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<String> listDatabaseNames(SessionContext ctx) {
+        makeSureInitialized();
+        return new ArrayList<>(dbNameToId.keySet());
+    }
+
+    @Override
+    public List<Column> getSchema(String dbName, String tblName) {
+        makeSureInitialized();
+        List<Types.NestedField> columns = getIcebergTable(dbName, 
tblName).schema().columns();
+        List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
+        for (Types.NestedField field : columns) {
+            tmpSchema.add(new Column(field.name(),
+                    icebergTypeToDorisType(field.type()), true, null,
+                    true, null, field.doc(), true, null, -1));
+        }
+        return tmpSchema;
+    }
+
+    @Override
+    public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
+        makeSureInitialized();
+        return catalog.tableExists(TableIdentifier.of(dbName, tblName));
+    }
+
+    @Override
+    public List<String> listTableNames(SessionContext ctx, String dbName) {
+        makeSureInitialized();
+        List<TableIdentifier> tableIdentifiers = 
catalog.listTables(Namespace.of(dbName));
+        return 
tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
+    }
+
+    public org.apache.iceberg.Table getIcebergTable(String dbName, String 
tblName) {
+        makeSureInitialized();
+        return catalog.loadTable(TableIdentifier.of(dbName, tblName));
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
new file mode 100644
index 0000000000..62ad0c8729
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.CatalogIf;
+
+import java.util.Map;
+
+public class IcebergExternalCatalogFactory {
+
+    public static CatalogIf createCatalog(long catalogId, String name, String 
resource, Map<String, String> props)
+            throws DdlException {
+        String catalogType = 
props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
+        if (catalogType == null) {
+            throw new DdlException("Missing " + 
IcebergExternalCatalog.ICEBERG_CATALOG_TYPE + " property");
+        }
+        switch (catalogType) {
+            case IcebergExternalCatalog.ICEBERG_REST:
+                return new IcebergRestExternalCatalog(catalogId, name, 
resource, catalogType, props);
+            case IcebergExternalCatalog.ICEBERG_HMS:
+                return new IcebergHMSExternalCatalog(catalogId, name, 
resource, catalogType, props);
+            default:
+                throw new DdlException("Unknown " + 
IcebergExternalCatalog.ICEBERG_CATALOG_TYPE
+                    + " value: " + catalogType);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
new file mode 100644
index 0000000000..f969ae7085
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.catalog.HMSResource;
+import org.apache.doris.datasource.CatalogProperty;
+
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.hive.HiveCatalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IcebergHMSExternalCatalog extends IcebergExternalCatalog {
+
+    public IcebergHMSExternalCatalog(long catalogId, String name, String 
resource, String catalogType,
+                                     Map<String, String> props) {
+        super(catalogId, name, catalogType);
+        catalogProperty = new CatalogProperty(resource, props);
+    }
+
+    @Override
+    protected void initLocalObjectsImpl() {
+        HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
+        hiveCatalog.setConf(getConfiguration());
+        // initialize hive catalog
+        Map<String, String> catalogProperties = new HashMap<>();
+        String metastoreUris = 
catalogProperty.getOrDefault(HMSResource.HIVE_METASTORE_URIS, "");
+
+        catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, metastoreUris);
+        catalogProperties.put(CatalogProperties.URI, metastoreUris);
+        hiveCatalog.initialize(icebergCatalogType, catalogProperties);
+        catalog = hiveCatalog;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
new file mode 100644
index 0000000000..f3f240b661
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.rest.RESTCatalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
+
+    public  IcebergRestExternalCatalog(long catalogId, String name, String 
resource, String catalogType,
+                                       Map<String, String> props) {
+        super(catalogId, name, catalogType);
+        catalogProperty = new CatalogProperty(resource, props);
+    }
+
+    @Override
+    protected void initLocalObjectsImpl() {
+        Map<String, String> restProperties = new HashMap<>();
+        String restUri = 
catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, "");
+        restProperties.put(CatalogProperties.URI, restUri);
+        RESTCatalog restCatalog = new RESTCatalog();
+        String credentials = catalogProperty.getProperties()
+                .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, 
DataLakeAWSCredentialsProvider.class.getName());
+        Configuration conf = getConfiguration();
+        conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials);
+        restCatalog.setConf(conf);
+        restCatalog.initialize(icebergCatalogType, restProperties);
+        catalog = restCatalog;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 199e0c623a..ae286c7974 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1940,6 +1940,7 @@ public class SingleNodePlanner {
                 scanNode = ((TableValuedFunctionRef) 
tblRef).getScanNode(ctx.getNextNodeId());
                 break;
             case HMS_EXTERNAL_TABLE:
+            case ICEBERG_EXTERNAL_TABLE:
                 scanNode = new ExternalFileScanNode(ctx.getNextNodeId(), 
tblRef.getDesc());
                 break;
             case ES_EXTERNAL_TABLE:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index de007632c9..7f5cbddb87 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -38,11 +38,17 @@ import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.catalog.external.IcebergExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.iceberg.IcebergApiSource;
+import org.apache.doris.planner.external.iceberg.IcebergHMSSource;
+import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
+import org.apache.doris.planner.external.iceberg.IcebergSource;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
@@ -175,6 +181,9 @@ public class ExternalFileScanNode extends ExternalScanNode {
                 } else if (this.desc.getTable() instanceof FunctionGenTable) {
                     FunctionGenTable table = (FunctionGenTable) 
this.desc.getTable();
                     initFunctionGenTable(table, 
(ExternalFileTableValuedFunction) table.getTvf());
+                } else if (this.desc.getTable() instanceof 
IcebergExternalTable) {
+                    IcebergExternalTable table = (IcebergExternalTable) 
this.desc.getTable();
+                    initIcebergExternalTable(table);
                 }
                 break;
             case LOAD:
@@ -211,6 +220,9 @@ public class ExternalFileScanNode extends ExternalScanNode {
                 } else if (this.desc.getTable() instanceof FunctionGenTable) {
                     FunctionGenTable table = (FunctionGenTable) 
this.desc.getTable();
                     initFunctionGenTable(table, 
(ExternalFileTableValuedFunction) table.getTvf());
+                } else if (this.desc.getTable() instanceof 
IcebergExternalTable) {
+                    IcebergExternalTable table = (IcebergExternalTable) 
this.desc.getTable();
+                    initIcebergExternalTable(table);
                 }
                 break;
             default:
@@ -244,7 +256,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
                 scanProvider = new HudiScanProvider(hmsTable, desc, 
columnNameToRange);
                 break;
             case ICEBERG:
-                scanProvider = new IcebergScanProvider(hmsTable, analyzer, 
desc, columnNameToRange);
+                IcebergSource hmsSource = new IcebergHMSSource(hmsTable, desc, 
columnNameToRange);
+                scanProvider = new IcebergScanProvider(hmsSource, analyzer);
                 break;
             case HIVE:
                 scanProvider = new HiveScanProvider(hmsTable, desc, 
columnNameToRange);
@@ -255,6 +268,29 @@ public class ExternalFileScanNode extends ExternalScanNode 
{
         this.scanProviders.add(scanProvider);
     }
 
+    private void initIcebergExternalTable(IcebergExternalTable icebergTable) 
throws UserException {
+        Preconditions.checkNotNull(icebergTable);
+        if (icebergTable.isView()) {
+            throw new AnalysisException(
+                String.format("Querying external view '%s.%s' is not 
supported", icebergTable.getDbName(),
+                        icebergTable.getName()));
+        }
+
+        FileScanProviderIf scanProvider;
+        String catalogType = icebergTable.getIcebergCatalogType();
+        switch (catalogType) {
+            case IcebergExternalCatalog.ICEBERG_HMS:
+            case IcebergExternalCatalog.ICEBERG_REST:
+                IcebergSource icebergSource = new IcebergApiSource(
+                        icebergTable, desc, columnNameToRange);
+                scanProvider = new IcebergScanProvider(icebergSource, 
analyzer);
+                break;
+            default:
+                throw new UserException("Unknown iceberg catalog type: " + 
catalogType);
+        }
+        this.scanProviders.add(scanProvider);
+    }
+
     private void initFunctionGenTable(FunctionGenTable table, 
ExternalFileTableValuedFunction tvf) {
         Preconditions.checkNotNull(table);
         FileScanProviderIf scanProvider = new TVFScanProvider(table, desc, 
tvf);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index b332819700..d894dd1fff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -25,6 +25,8 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
 import 
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
+import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
+import org.apache.doris.planner.external.iceberg.IcebergSplit;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TExternalScanRange;
 import org.apache.doris.thrift.TFileAttributes;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
new file mode 100644
index 0000000000..35b45282c5
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.iceberg;
+
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.IcebergExternalTable;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileScanRangeParams;
+import org.apache.doris.thrift.TFileScanSlotInfo;
+
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Get metadata from iceberg api (all iceberg table like hive, rest, glue...)
+ */
+public class IcebergApiSource implements IcebergSource {
+
+    private final IcebergExternalTable icebergExtTable;
+    private final Table originTable;
+
+    private final TupleDescriptor desc;
+
+    public IcebergApiSource(IcebergExternalTable table, TupleDescriptor desc,
+                            Map<String, ColumnRange> columnNameToRange) {
+        this.icebergExtTable = table;
+        this.originTable = ((IcebergExternalCatalog) 
icebergExtTable.getCatalog())
+                .getIcebergTable(icebergExtTable.getDbName(), 
icebergExtTable.getName());
+        this.desc = desc;
+    }
+
+    @Override
+    public TupleDescriptor getDesc() {
+        return desc;
+    }
+
+    @Override
+    public String getFileFormat() {
+        return originTable.properties()
+            .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    }
+
+    @Override
+    public Table getIcebergTable() throws MetaNotFoundException {
+        return originTable;
+    }
+
+    @Override
+    public TableIf getTargetTable() {
+        return icebergExtTable;
+    }
+
+    @Override
+    public ExternalFileScanNode.ParamCreateContext createContext() throws 
UserException {
+        ExternalFileScanNode.ParamCreateContext context = new 
ExternalFileScanNode.ParamCreateContext();
+        context.params = new TFileScanRangeParams();
+        context.destTupleDescriptor = desc;
+        context.params.setDestTupleId(desc.getId().asInt());
+        context.fileGroup = new BrokerFileGroup(icebergExtTable.getId(), 
originTable.location(), getFileFormat());
+
+        // Hive table must extract partition value from path and hudi/iceberg 
table keep
+        // partition field in file.
+        List<String> partitionKeys =  originTable.spec().fields().stream()
+                .map(PartitionField::name).collect(Collectors.toList());
+        List<Column> columns = icebergExtTable.getBaseSchema(false);
+        context.params.setNumOfColumnsFromFile(columns.size() - 
partitionKeys.size());
+        for (SlotDescriptor slot : desc.getSlots()) {
+            if (!slot.isMaterialized()) {
+                continue;
+            }
+            TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
+            slotInfo.setSlotId(slot.getId().asInt());
+            
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
+            context.params.addToRequiredSlots(slotInfo);
+        }
+        return context;
+    }
+
+    @Override
+    public TFileAttributes getFileAttributes() throws UserException {
+        return new TFileAttributes();
+    }
+
+    @Override
+    public ExternalCatalog getCatalog() {
+        return icebergExtTable.getCatalog();
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergDeleteFileFilter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergDeleteFileFilter.java
similarity index 92%
rename from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergDeleteFileFilter.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergDeleteFileFilter.java
index 9add306c46..e1e94a8329 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergDeleteFileFilter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergDeleteFileFilter.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.planner.external;
+package org.apache.doris.planner.external.iceberg;
 
 import lombok.Data;
 
@@ -64,7 +64,6 @@ public class IcebergDeleteFileFilter {
         }
     }
 
-    @Data
     static class EqualityDelete extends IcebergDeleteFileFilter {
         private List<Integer> fieldIds;
 
@@ -72,5 +71,13 @@ public class IcebergDeleteFileFilter {
             super(deleteFilePath);
             this.fieldIds = fieldIds;
         }
+
+        public List<Integer> getFieldIds() {
+            return fieldIds;
+        }
+
+        public void setFieldIds(List<Integer> fieldIds) {
+            this.fieldIds = fieldIds;
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
new file mode 100644
index 0000000000..747d7fd6f6
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.iceberg;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.planner.external.HiveScanProvider;
+import org.apache.doris.thrift.TFileAttributes;
+
+import org.apache.iceberg.TableProperties;
+
+import java.util.Map;
+
+public class IcebergHMSSource implements IcebergSource {
+
+    private final HMSExternalTable hmsTable;
+    private final HiveScanProvider hiveScanProvider;
+
+    private final TupleDescriptor desc;
+
+    public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
+                            Map<String, ColumnRange> columnNameToRange) {
+        this.hiveScanProvider = new HiveScanProvider(hmsTable, desc, 
columnNameToRange);
+        this.hmsTable = hmsTable;
+        this.desc = desc;
+    }
+
+    @Override
+    public TupleDescriptor getDesc() {
+        return desc;
+    }
+
+    @Override
+    public String getFileFormat() throws DdlException, MetaNotFoundException {
+        return hiveScanProvider.getRemoteHiveTable().getParameters()
+                .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    }
+
+    public org.apache.iceberg.Table getIcebergTable() throws 
MetaNotFoundException {
+        return HiveMetaStoreClientHelper.getIcebergTable(hmsTable);
+    }
+
+    @Override
+    public ExternalFileScanNode.ParamCreateContext createContext() throws 
UserException {
+        return hiveScanProvider.createContext(null);
+    }
+
+    @Override
+    public TableIf getTargetTable() {
+        return hiveScanProvider.getTargetTable();
+    }
+
+    @Override
+    public TFileAttributes getFileAttributes() throws UserException {
+        return hiveScanProvider.getFileAttributes();
+    }
+
+    @Override
+    public ExternalCatalog getCatalog() {
+        return hmsTable.getCatalog();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
similarity index 75%
rename from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
index 24dc94fc0e..94e744725a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
@@ -15,22 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.planner.external;
+package org.apache.doris.planner.external.iceberg;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.TableSnapshot;
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.external.iceberg.util.IcebergUtils;
-import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.planner.external.QueryScanProvider;
+import org.apache.doris.planner.external.TableFormatType;
+import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TIcebergDeleteFileDesc;
 import org.apache.doris.thrift.TIcebergFileDesc;
 import org.apache.doris.thrift.TTableFormatFileDesc;
@@ -43,7 +46,7 @@ import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.HistoryEntry;
 import org.apache.iceberg.MetadataColumns;
-import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.expressions.Expression;
@@ -52,23 +55,23 @@ import org.apache.iceberg.types.Conversions;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.stream.Collectors;
 
 /**
  * A file scan provider for iceberg.
  */
-public class IcebergScanProvider extends HiveScanProvider {
+public class IcebergScanProvider extends QueryScanProvider {
 
     private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
     private final Analyzer analyzer;
+    private final IcebergSource icebergSource;
 
-    public IcebergScanProvider(HMSExternalTable hmsTable, Analyzer analyzer, 
TupleDescriptor desc,
-                               Map<String, ColumnRange> columnNameToRange) {
-        super(hmsTable, desc, columnNameToRange);
+    public IcebergScanProvider(IcebergSource icebergSource, Analyzer analyzer) 
{
+        this.icebergSource = icebergSource;
         this.analyzer = analyzer;
     }
 
@@ -111,34 +114,43 @@ public class IcebergScanProvider extends HiveScanProvider 
{
     }
 
     @Override
-    public TFileFormatType getFileFormatType() throws DdlException, 
MetaNotFoundException {
-        TFileFormatType type;
-
-        String icebergFormat = getRemoteHiveTable().getParameters()
-                .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
-        if (icebergFormat.equalsIgnoreCase("parquet")) {
-            type = TFileFormatType.FORMAT_PARQUET;
-        } else if (icebergFormat.equalsIgnoreCase("orc")) {
-            type = TFileFormatType.FORMAT_ORC;
-        } else {
-            throw new DdlException(String.format("Unsupported format name: %s 
for iceberg table.", icebergFormat));
+    public TFileType getLocationType() throws DdlException, 
MetaNotFoundException {
+        String location = icebergSource.getIcebergTable().location();
+        if (location != null && !location.isEmpty()) {
+            if (location.startsWith(FeConstants.FS_PREFIX_S3)
+                    || location.startsWith(FeConstants.FS_PREFIX_S3A)
+                    || location.startsWith(FeConstants.FS_PREFIX_S3N)
+                    || location.startsWith(FeConstants.FS_PREFIX_BOS)
+                    || location.startsWith(FeConstants.FS_PREFIX_COS)
+                    || location.startsWith(FeConstants.FS_PREFIX_OSS)
+                    || location.startsWith(FeConstants.FS_PREFIX_OBS)) {
+                return TFileType.FILE_S3;
+            } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
+                return TFileType.FILE_HDFS;
+            } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
+                return TFileType.FILE_LOCAL;
+            } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
+                return TFileType.FILE_BROKER;
+            } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
+                return TFileType.FILE_BROKER;
+            }
         }
-        return type;
+        throw new DdlException("Unknown file location " + location
+            + " for hms table " + icebergSource.getIcebergTable().name());
     }
 
     @Override
     public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
         List<Expression> expressions = new ArrayList<>();
-        org.apache.iceberg.Table table = 
HiveMetaStoreClientHelper.getIcebergTable(hmsTable);
+        org.apache.iceberg.Table table = icebergSource.getIcebergTable();
         for (Expr conjunct : exprs) {
             Expression expression = 
IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
             if (expression != null) {
                 expressions.add(expression);
             }
         }
-
         TableScan scan = table.newScan();
-        TableSnapshot tableSnapshot = desc.getRef().getTableSnapshot();
+        TableSnapshot tableSnapshot = 
icebergSource.getDesc().getRef().getTableSnapshot();
         if (tableSnapshot != null) {
             TableSnapshot.VersionType type = tableSnapshot.getType();
             try {
@@ -220,6 +232,41 @@ public class IcebergScanProvider extends HiveScanProvider {
 
     @Override
     public List<String> getPathPartitionKeys() throws DdlException, 
MetaNotFoundException {
-        return Collections.emptyList();
+        return 
icebergSource.getIcebergTable().spec().fields().stream().map(PartitionField::name)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public TFileFormatType getFileFormatType() throws DdlException, 
MetaNotFoundException {
+        TFileFormatType type;
+        String icebergFormat = icebergSource.getFileFormat();
+        if (icebergFormat.equalsIgnoreCase("parquet")) {
+            type = TFileFormatType.FORMAT_PARQUET;
+        } else if (icebergFormat.equalsIgnoreCase("orc")) {
+            type = TFileFormatType.FORMAT_ORC;
+        } else {
+            throw new DdlException(String.format("Unsupported format name: %s 
for iceberg table.", icebergFormat));
+        }
+        return type;
+    }
+
+    @Override
+    public Map<String, String> getLocationProperties() throws 
MetaNotFoundException, DdlException {
+        return icebergSource.getCatalog().getProperties();
+    }
+
+    @Override
+    public ExternalFileScanNode.ParamCreateContext createContext(Analyzer 
analyzer) throws UserException {
+        return icebergSource.createContext();
+    }
+
+    @Override
+    public TableIf getTargetTable() {
+        return icebergSource.getTargetTable();
+    }
+
+    @Override
+    public TFileAttributes getFileAttributes() throws UserException {
+        return icebergSource.getFileAttributes();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java
new file mode 100644
index 0000000000..ab17c6a448
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.iceberg;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.thrift.TFileAttributes;
+
+public interface IcebergSource {
+
+    TupleDescriptor getDesc();
+
+    org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException;
+
+    ExternalFileScanNode.ParamCreateContext createContext() throws 
UserException;
+
+    TableIf getTargetTable();
+
+    TFileAttributes getFileAttributes() throws UserException;
+
+    ExternalCatalog getCatalog();
+
+    String getFileFormat() throws DdlException, MetaNotFoundException;
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
similarity index 92%
rename from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index b9607a7f00..a82c99b04a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -15,9 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.planner.external;
+package org.apache.doris.planner.external.iceberg;
 
 import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.planner.external.HiveSplit;
 
 import lombok.Data;
 import org.apache.hadoop.fs.Path;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to