This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 87b96cf  [feature](iceberg) Step3: Support query iceberg external 
table (#8179)
87b96cf is described below

commit 87b96cfcd6c3cdecb7441f89e208b1f77177182c
Author: qiye <jianliang5...@gmail.com>
AuthorDate: Sat Feb 26 17:04:11 2022 +0800

    [feature](iceberg) Step3: Support query iceberg external table (#8179)
    
    1. Add Iceberg scan node
    2. Add Iceberg/Hive table type in thrift
    3. Support querying Iceberg tables of format types `parquet` and `orc`
---
 be/src/runtime/descriptors.cpp                     |  28 ++++
 be/src/runtime/descriptors.h                       |  18 +++
 fe/fe-core/pom.xml                                 |   8 ++
 .../java/org/apache/doris/catalog/HiveTable.java   |   2 +-
 .../org/apache/doris/catalog/IcebergTable.java     | 159 ++++++++++++++++++---
 .../org/apache/doris/load/BrokerFileGroup.java     |  11 ++
 .../java/org/apache/doris/load/LoadChecker.java    |   2 +-
 .../org/apache/doris/planner/BrokerScanNode.java   |  28 ++--
 .../apache/doris/planner/DistributedPlanner.java   |   2 +-
 .../org/apache/doris/planner/HiveScanNode.java     |   2 +-
 .../org/apache/doris/planner/IcebergScanNode.java  | 109 ++++++++++++++
 .../apache/doris/planner/SingleNodePlanner.java    |   4 +
 .../java/org/apache/doris/qe/AuditLogBuilder.java  |   5 +-
 .../apache/doris/catalog/CreateTableLikeTest.java  |   7 +-
 .../org/apache/doris/plugin/PluginMgrTest.java     |   3 +-
 fe/pom.xml                                         |  12 ++
 gensrc/thrift/Types.thrift                         |   4 +-
 17 files changed, 362 insertions(+), 42 deletions(-)

diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index 162545a..4225a75 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -147,6 +147,28 @@ std::string BrokerTableDescriptor::debug_string() const {
     return out.str();
 }
 
+HiveTableDescriptor::HiveTableDescriptor(const TTableDescriptor& tdesc)
+        : TableDescriptor(tdesc) {}
+
+HiveTableDescriptor::~HiveTableDescriptor() {}
+
+std::string HiveTableDescriptor::debug_string() const {
+    std::stringstream out;
+    out << "HiveTable(" << TableDescriptor::debug_string() << ")";
+    return out.str();
+}
+
+IcebergTableDescriptor::IcebergTableDescriptor(const TTableDescriptor& tdesc)
+        : TableDescriptor(tdesc) {}
+
+IcebergTableDescriptor::~IcebergTableDescriptor() {}
+
+std::string IcebergTableDescriptor::debug_string() const {
+    std::stringstream out;
+    out << "IcebergTable(" << TableDescriptor::debug_string() << ")";
+    return out.str();
+}
+
 EsTableDescriptor::EsTableDescriptor(const TTableDescriptor& tdesc) : 
TableDescriptor(tdesc) {}
 
 EsTableDescriptor::~EsTableDescriptor() {}
@@ -523,6 +545,12 @@ Status DescriptorTbl::create(ObjectPool* pool, const 
TDescriptorTable& thrift_tb
         case TTableType::ES_TABLE:
             desc = pool->add(new EsTableDescriptor(tdesc));
             break;
+        case TTableType::HIVE_TABLE:
+            desc = pool->add(new HiveTableDescriptor(tdesc));
+            break;
+        case TTableType::ICEBERG_TABLE:
+            desc = pool->add(new IcebergTableDescriptor(tdesc));
+            break;
         default:
             DCHECK(false) << "invalid table type: " << tdesc.tableType;
         }
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 0bdc22b..cdbc3d7 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -190,6 +190,24 @@ public:
 private:
 };
 
+class HiveTableDescriptor : public TableDescriptor {
+public:
+    HiveTableDescriptor(const TTableDescriptor& tdesc);
+    virtual ~HiveTableDescriptor();
+    virtual std::string debug_string() const;
+
+private:
+};
+
+class IcebergTableDescriptor : public TableDescriptor {
+public:
+    IcebergTableDescriptor(const TTableDescriptor& tdesc);
+    virtual ~IcebergTableDescriptor();
+    virtual std::string debug_string() const;
+
+private:
+};
+
 class EsTableDescriptor : public TableDescriptor {
 public:
     EsTableDescriptor(const TTableDescriptor& tdesc);
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 9b11108..a71545c 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -599,6 +599,14 @@ under the License.
             <artifactId>iceberg-hive-metastore</artifactId>
             <scope>provided</scope>
         </dependency>
+
+        <!-- For Iceberg, must be consistent with Iceberg version -->
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
     </dependencies>
     <build>
         <finalName>palo-fe</finalName>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
index 236d266..ff4adc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
@@ -134,7 +134,7 @@ public class HiveTable extends Table {
     @Override
     public TTableDescriptor toThrift() {
         THiveTable tHiveTable = new THiveTable(getHiveDb(), getHiveTable(), 
getHiveProperties());
-        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.BROKER_TABLE,
+        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.HIVE_TABLE,
                 fullSchema.size(), 0, getName(), "");
         tTableDescriptor.setHiveTable(tHiveTable);
         return tTableDescriptor;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
index c164936..41a0458 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
@@ -17,22 +17,38 @@
 
 package org.apache.doris.catalog;
 
-
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.external.iceberg.IcebergCatalog;
+import org.apache.doris.external.iceberg.IcebergCatalogMgr;
+import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TIcebergTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
-import com.google.common.collect.Maps;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * External Iceberg table
@@ -44,10 +60,23 @@ public class IcebergTable extends Table {
     private String icebergDb;
     // remote Iceberg table name
     private String icebergTbl;
+    // remote Iceberg table location
+    private String location;
+    // Iceberg table file format
+    private String fileFormat;
+    // Iceberg storage type
+    private StorageBackend.StorageType storageType;
+    // Iceberg remote host uri
+    private String hostUri;
+    // location analyze flag
+    private boolean isAnalyzed = false;
     private Map<String, String> icebergProperties = Maps.newHashMap();
 
     private org.apache.iceberg.Table icebergTable;
 
+    private final byte[] loadLock = new byte[0];
+    private final AtomicBoolean isLoaded = new AtomicBoolean(false);
+
     public IcebergTable() {
         super(TableType.ICEBERG);
     }
@@ -73,28 +102,126 @@ public class IcebergTable extends Table {
         return icebergDb;
     }
 
-    public void setIcebergDb(String icebergDb) {
-        this.icebergDb = icebergDb;
-    }
-
     public String getIcebergTbl() {
         return icebergTbl;
     }
 
-    public void setIcebergTbl(String icebergTbl) {
-        this.icebergTbl = icebergTbl;
-    }
-
     public Map<String, String> getIcebergProperties() {
         return icebergProperties;
     }
 
-    public void setIcebergProperties(Map<String, String> icebergProperties) {
-        this.icebergProperties = icebergProperties;
+    private void getLocation() throws UserException {
+        if (Strings.isNullOrEmpty(location)) {
+            try {
+                getTable();
+            } catch (Exception e) {
+                throw new UserException("Failed to get table: " + name + 
",error: " + e.getMessage());
+            }
+            location = icebergTable.location();
+        }
+        analyzeLocation();
+    }
+
+    private void analyzeLocation() throws UserException {
+        if (isAnalyzed) {
+            return;
+        }
+        String[] strings = StringUtils.split(location, "/");
+
+        // analyze storage type
+        String storagePrefix = strings[0].split(":")[0];
+        if (storagePrefix.equalsIgnoreCase("s3")) {
+            this.storageType = StorageBackend.StorageType.S3;
+        } else if (storagePrefix.equalsIgnoreCase("hdfs")) {
+            this.storageType = StorageBackend.StorageType.HDFS;
+        } else {
+            throw new UserException("Not supported storage type: " + 
storagePrefix);
+        }
+
+        // analyze host uri
+        // eg: hdfs://host:port
+        //     s3://host:port
+        String host = strings[1];
+        this.hostUri = storagePrefix + "://" + host;
+        this.isAnalyzed = true;
+    }
+
+    public String getHostUri() throws UserException {
+        if (!isAnalyzed) {
+            getLocation();
+        }
+        return hostUri;
+    }
+
+    public StorageBackend.StorageType getStorageType() throws UserException {
+        if (!isAnalyzed) {
+            getLocation();
+        }
+        return storageType;
+    }
+
+    public String getFileFormat() throws UserException {
+        if (Strings.isNullOrEmpty(fileFormat)) {
+            try {
+                getTable();
+            } catch (Exception e) {
+                throw new UserException("Failed to get table: " + name + 
",error: " + e.getMessage());
+            }
+            fileFormat = 
icebergTable.properties().get(TableProperties.DEFAULT_FILE_FORMAT);
+        }
+        return fileFormat;
+    }
+
+    // get the iceberg table instance, if table is not loaded, load it.
+    private org.apache.iceberg.Table getTable() throws Exception {
+        if (isLoaded.get()) {
+            Preconditions.checkNotNull(icebergTable);
+            return icebergTable;
+        }
+        synchronized (loadLock) {
+            if (icebergTable != null) {
+                return icebergTable;
+            }
+
+            IcebergProperty icebergProperty = getIcebergProperty();
+            IcebergCatalog icebergCatalog = 
IcebergCatalogMgr.getCatalog(icebergProperty);
+            try {
+                this.icebergTable = 
icebergCatalog.loadTable(TableIdentifier.of(icebergDb, icebergTbl));
+                LOG.info("finished to load iceberg table: {}", name);
+            } catch (Exception e) {
+                LOG.warn("failed to load iceberg table {} from {}", name, 
icebergProperty.getHiveMetastoreUris(), e);
+                throw e;
+            }
+
+            isLoaded.set(true);
+            return icebergTable;
+        }
     }
 
-    public org.apache.iceberg.Table getIcebergTable() {
-        return icebergTable;
+    private IcebergProperty getIcebergProperty() {
+        Map<String, String> properties = Maps.newHashMap(icebergProperties);
+        properties.put(IcebergProperty.ICEBERG_DATABASE, icebergDb);
+        properties.put(IcebergProperty.ICEBERG_TABLE, icebergTbl);
+        return new IcebergProperty(properties);
+    }
+
+    /**
+     * Get iceberg data file by file system table location and iceberg 
predicates
+     * @throws Exception
+     */
+    public List<TBrokerFileStatus> getIcebergDataFiles(List<Expression> 
predicates) throws Exception {
+        org.apache.iceberg.Table table = getTable();
+        TableScan scan = table.newScan();
+        for (Expression predicate : predicates) {
+            scan = scan.filter(predicate);
+        }
+        List<TBrokerFileStatus> relatedFiles = Lists.newArrayList();
+        for (FileScanTask task : scan.planFiles()) {
+            Path path = Paths.get(task.file().path().toString());
+            String relativePath = "/" + path.subpath(2, path.getNameCount());
+            relatedFiles.add(new TBrokerFileStatus(relativePath, false, 
task.file().fileSizeInBytes(), false));
+        }
+        return relatedFiles;
     }
 
     @Override
@@ -128,7 +255,7 @@ public class IcebergTable extends Table {
     @Override
     public TTableDescriptor toThrift() {
         TIcebergTable tIcebergTable = new TIcebergTable(getIcebergDb(), 
getIcebergTbl(), getIcebergProperties());
-        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.BROKER_TABLE,
+        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.ICEBERG_TABLE,
                 fullSchema.size(), 0, getName(), "");
         tTableDescriptor.setIcebergTable(tIcebergTable);
         return tTableDescriptor;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 95dbc12..b5312df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.HiveTable;
+import org.apache.doris.catalog.IcebergTable;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.OlapTable.OlapTableState;
@@ -37,6 +38,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.load.loadv2.LoadTask;
@@ -137,6 +139,15 @@ public class BrokerFileGroup implements Writable {
         this.columnExprList = columnExprList;
     }
 
+    // Used for iceberg table, no need to parse
+    public BrokerFileGroup(IcebergTable table) throws UserException {
+        this.tableId = table.getId();
+        this.isNegative = false;
+        this.valueSeparator = "|";
+        this.lineDelimiter = "\n";
+        this.fileFormat = table.getFileFormat();
+    }
+
     public BrokerFileGroup(DataDescription dataDescription) {
         this.fileFieldNames = dataDescription.getFileFieldNames();
         this.columnsFromPath = dataDescription.getColumnsFromPath();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
index f87cf93..0ee4fd2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
@@ -50,6 +50,7 @@ import 
org.apache.doris.transaction.TabletQuorumFailedException;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -63,7 +64,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import avro.shaded.com.google.common.collect.Lists;
 
 public class LoadChecker extends MasterDaemon {
     private static final Logger LOG = LogManager.getLogger(LoadChecker.class);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 9f75814..bfab512 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -401,7 +401,7 @@ public class BrokerScanNode extends LoadScanNode {
         Collections.shuffle(backends, random);
     }
 
-    private TFileFormatType formatType(String fileFormat, String path) {
+    private TFileFormatType formatType(String fileFormat, String path) throws 
UserException {
         if (fileFormat != null) {
             if (fileFormat.toLowerCase().equals("parquet")) {
                 return TFileFormatType.FORMAT_PARQUET;
@@ -411,6 +411,8 @@ public class BrokerScanNode extends LoadScanNode {
                 return TFileFormatType.FORMAT_JSON;
             } else if (fileFormat.toLowerCase().equals("csv")) {
                 return TFileFormatType.FORMAT_CSV_PLAIN;
+            } else {
+                throw new UserException("Not supported file format: " + 
fileFormat);
             }
         }
 
@@ -432,6 +434,10 @@ public class BrokerScanNode extends LoadScanNode {
         }
     }
 
+    public String getHostUri() throws UserException {
+        return "";
+    }
+
     // If fileFormat is not null, we use fileFormat instead of check file's 
suffix
     private void processFileGroup(
             ParamCreateContext context,
@@ -440,11 +446,11 @@ public class BrokerScanNode extends LoadScanNode {
         if (fileStatuses  == null || fileStatuses.isEmpty()) {
             return;
         }
+        // set hdfs params, used to Hive and Iceberg scan
         THdfsParams tHdfsParams = new THdfsParams();
-        if (this instanceof HiveScanNode) {
-            String fsName = ((HiveScanNode) this).getHdfsUri();
-            tHdfsParams.setFsName(fsName);
-        }
+        String fsName = getHostUri();
+        tHdfsParams.setFsName(fsName);
+
         TScanRangeLocations curLocations = newLocations(context.params, 
brokerDesc);
         long curInstanceBytes = 0;
         long curFileOffset = 0;
@@ -477,10 +483,8 @@ public class BrokerScanNode extends LoadScanNode {
                 } else {
                     TBrokerRangeDesc rangeDesc = 
createBrokerRangeDesc(curFileOffset, fileStatus, formatType,
                             leftBytes, columnsFromPath, 
numberOfColumnsFromFile, brokerDesc);
-                    if (this instanceof HiveScanNode) {
-                        rangeDesc.setHdfsParams(tHdfsParams);
-                        rangeDesc.setReadByColumnDef(true);
-                    }
+                    rangeDesc.setHdfsParams(tHdfsParams);
+                    rangeDesc.setReadByColumnDef(true);
                     brokerScanRange(curLocations).addToRanges(rangeDesc);
                     curFileOffset = 0;
                     i++;
@@ -502,10 +506,8 @@ public class BrokerScanNode extends LoadScanNode {
                     
rangeDesc.setNumAsString(context.fileGroup.isNumAsString());
                     
rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine());
                 }
-                if (this instanceof HiveScanNode) {
-                    rangeDesc.setHdfsParams(tHdfsParams);
-                    rangeDesc.setReadByColumnDef(true);
-                }
+                rangeDesc.setHdfsParams(tHdfsParams);
+                rangeDesc.setReadByColumnDef(true);
                 brokerScanRange(curLocations).addToRanges(rangeDesc);
                 curFileOffset = 0;
                 curInstanceBytes += leftBytes;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 8c209a6..eb099c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -40,6 +40,7 @@ import org.apache.doris.thrift.TPartitionType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -50,7 +51,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import avro.shaded.com.google.common.collect.Maps;
 
 /**
  * The distributed planner is responsible for creating an executable, 
distributed plan
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
index 2619079..a67f539 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
@@ -70,7 +70,7 @@ public class HiveScanNode extends BrokerScanNode {
     private List<String> partitionKeys = new ArrayList<>();
     /* hive table properties */
 
-    public String getHdfsUri() {
+    public String getHostUri() {
         return hdfsUri;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
new file mode 100644
index 0000000..5428c9e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.IcebergProperty;
+import org.apache.doris.catalog.IcebergTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TExplainLevel;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class IcebergScanNode extends BrokerScanNode {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergScanNode.class);
+
+    private IcebergTable icebergTable;
+    private final List<Expression> icebergPredicates = new ArrayList<>();
+
+    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
+                           List<List<TBrokerFileStatus>> fileStatusesList, int 
filesAdded) {
+        super(id, desc, planNodeName, fileStatusesList, filesAdded);
+        icebergTable = (IcebergTable) desc.getTable();
+    }
+
+    @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+    }
+
+    @Override
+    protected void initFileGroup() throws UserException {
+        fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable));
+        brokerDesc = new BrokerDesc("IcebergTableDesc", 
icebergTable.getStorageType(),
+                icebergTable.getIcebergProperties());
+        targetTable = icebergTable;
+    }
+
+    @Override
+    public String getHostUri() throws UserException {
+        return icebergTable.getHostUri();
+    }
+
+    @Override
+    protected void getFileStatus() throws UserException {
+        // extract iceberg conjuncts
+        ListIterator<Expr> it = conjuncts.listIterator();
+        while (it.hasNext()) {
+            Expression expression = 
IcebergUtils.convertToIcebergExpr(it.next());
+            if (expression != null) {
+                icebergPredicates.add(expression);
+            }
+        }
+        // get iceberg file status
+        List<TBrokerFileStatus> fileStatuses;
+        try {
+            fileStatuses = icebergTable.getIcebergDataFiles(icebergPredicates);
+        } catch (Exception e) {
+            LOG.warn("errors while load iceberg table {} data files.", 
icebergTable.getName(), e);
+            throw new UserException("errors while load Iceberg table ["
+                    + icebergTable.getName() + "] data files.");
+        }
+        fileStatusesList.add(fileStatuses);
+        filesAdded += fileStatuses.size();
+        for (TBrokerFileStatus fstatus : fileStatuses) {
+            LOG.debug("Add file status is {}", fstatus);
+        }
+    }
+
+    @Override
+    public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
+        StringBuilder output = new StringBuilder();
+        if (!isLoad()) {
+            output.append(prefix).append("TABLE: 
").append(icebergTable.getName()).append("\n");
+            output.append(prefix).append("PATH: ")
+                    
.append(icebergTable.getIcebergProperties().get(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS))
+                    .append("\n");
+        }
+        return output.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index b3aaf2a..8414da0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1698,6 +1698,10 @@ public class SingleNodePlanner {
                 scanNode = new HiveScanNode(ctx_.getNextNodeId(), 
tblRef.getDesc(), "HiveScanNode",
                         null, -1);
                 break;
+            case ICEBERG:
+                scanNode = new IcebergScanNode(ctx_.getNextNodeId(), 
tblRef.getDesc(), "IcebergScanNode",
+                        null, -1);
+                break;
             default:
                 break;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
index c940e0d..818e116 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.qe;
 
-import avro.shaded.com.google.common.collect.Maps;
-import avro.shaded.com.google.common.collect.Sets;
 import org.apache.doris.common.AuditLog;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.DigitalVersion;
@@ -34,6 +32,9 @@ import org.apache.doris.plugin.PluginMgr;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.Set;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java
index 13a32d5..bda5ea6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java
@@ -17,14 +17,11 @@
 
 package org.apache.doris.catalog;
 
-import avro.shaded.com.google.common.collect.Lists;
-import org.apache.commons.collections.ListUtils;
 import org.apache.doris.analysis.CreateDbStmt;
 import org.apache.doris.analysis.CreateTableLikeStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ExceptionChecker;
-import org.apache.doris.common.util.ListUtil;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.utframe.UtFrameUtils;
 
@@ -33,12 +30,12 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 import java.io.File;
 import java.util.List;
 import java.util.UUID;
 
-import avro.shaded.com.google.common.collect.Lists;
-
 /**
  * @author wangcong
  * @version 1.0
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java
index e5a1885..80edee5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java
@@ -30,6 +30,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.Maps;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -38,7 +40,6 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.UUID;
 
-import avro.shaded.com.google.common.collect.Maps;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
diff --git a/fe/pom.xml b/fe/pom.xml
index 80903c6..8e13dcb 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -120,7 +120,12 @@ under the License.
         <log4j2.version>2.17.1</log4j2.version>
         <revision>0.15-SNAPSHOT</revision>
         <project.scm.id>github</project.scm.id>
+        <!-- ATTN: avro version must be consistent with Iceberg version -->
+        <!-- Please modify iceberg.version and avro.version together,
+         you can find avro version info in iceberg mvn repository -->
         <iceberg.version>0.12.0</iceberg.version>
+        <avro.version>1.10.1</avro.version>
+        <!-- ATTN: avro version must be consistent with Iceberg version -->
     </properties>
     <profiles>
         <!-- for custom internal repository -->
@@ -706,6 +711,13 @@ under the License.
                 <version>${iceberg.version}</version>
             </dependency>
 
+            <!-- For Iceberg, must be consistent with Iceberg version -->
+            <dependency>
+                <groupId>org.apache.avro</groupId>
+                <artifactId>avro</artifactId>
+                <version>${avro.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.apache.parquet</groupId>
                 <artifactId>parquet-column</artifactId>
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index c1c8487..88992a7 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -357,7 +357,9 @@ enum TTableType {
     KUDU_TABLE, // Deprecated
     BROKER_TABLE,
     ES_TABLE,
-    ODBC_TABLE
+    ODBC_TABLE,
+    HIVE_TABLE,
+    ICEBERG_TABLE
 }
 
 enum TOdbcTableType {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to