morningman commented on a change in pull request #8179:
URL: https://github.com/apache/incubator-doris/pull/8179#discussion_r811564325



##########
File path: fe/fe-core/pom.xml
##########
@@ -599,6 +599,14 @@ under the License.
             <artifactId>iceberg-hive-metastore</artifactId>
             <scope>provided</scope>
         </dependency>
+
+        <!-- For Iceberg, must be consistent with Iceberg version -->
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.10.1</version>
+        </dependency>

Review comment:
       Add it to `fe/pom.xml` and reference it here.

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.IcebergProperty;
+import org.apache.doris.catalog.IcebergTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TExplainLevel;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class IcebergScanNode extends BrokerScanNode {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergScanNode.class);
+
+    private IcebergTable icebergTable;
+    private final List<Expression> icebergPredicates = new ArrayList<>();
+    private String hdfsUri;
+
+    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
+                           List<List<TBrokerFileStatus>> fileStatusesList, int 
filesAdded) {
+        super(id, desc, planNodeName, fileStatusesList, filesAdded);
+        icebergTable = (IcebergTable) desc.getTable();
+    }
+
+    @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+    }
+
+    @Override
+    protected void initFileGroup() throws UserException {
+        fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable));
+        brokerDesc = new BrokerDesc("IcebergTableDesc", 
StorageBackend.StorageType.HDFS, icebergTable.getIcebergProperties());
+        targetTable = icebergTable;
+    }
+
+    @Override
+    public String getHdfsUri() throws UserException {
+        if (Strings.isNullOrEmpty(hdfsUri)) {
+            String location = icebergTable.getLocation();
+            String[] strings = StringUtils.split(location, "/");
+            String[] strs = StringUtils.split(strings[1], ":");
+            this.hdfsUri = "hdfs://" + strs[0] + ":" + strs[1];
+        }
+        return hdfsUri;
+    }
+
+    @Override
+    protected void getFileStatus() throws UserException {
+        // extract iceberg conjuncts
+        ListIterator<Expr> it = conjuncts.listIterator();
+        while (it.hasNext()) {
+            Expression expression = 
IcebergUtils.convertToIcebergExpr(it.next());
+            if (expression != null) {
+                icebergPredicates.add(expression);
+            }
+        }
+        // get iceberg file status
+        List<TBrokerFileStatus> fileStatuses;
+        try {
+            fileStatuses = icebergTable.getIcebergDataFiles(icebergPredicates);
+        } catch (Exception e) {
+            LOG.warn("errors while load iceberg table {} data files.", 
icebergTable.getName(), e);
+            throw new UserException("errors while load Iceberg table ["
+                    + icebergTable.getName() + "] data files.");
+        }
+        fileStatusesList.add(fileStatuses);
+        filesAdded += fileStatuses.size();
+        for (TBrokerFileStatus fstatus : fileStatuses) {
+            LOG.info("Add file status is {}", fstatus);

Review comment:
       ```suggestion
               LOG.debug("Add file status is {}", fstatus);
   ```

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.IcebergProperty;
+import org.apache.doris.catalog.IcebergTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TExplainLevel;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class IcebergScanNode extends BrokerScanNode {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergScanNode.class);
+
+    private IcebergTable icebergTable;
+    private final List<Expression> icebergPredicates = new ArrayList<>();
+    private String hdfsUri;
+
+    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
+                           List<List<TBrokerFileStatus>> fileStatusesList, int 
filesAdded) {
+        super(id, desc, planNodeName, fileStatusesList, filesAdded);
+        icebergTable = (IcebergTable) desc.getTable();
+    }
+
+    @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+    }
+
+    @Override
+    protected void initFileGroup() throws UserException {
+        fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable));
+        brokerDesc = new BrokerDesc("IcebergTableDesc", 
StorageBackend.StorageType.HDFS, icebergTable.getIcebergProperties());
+        targetTable = icebergTable;
+    }
+
+    @Override

Review comment:
       And what if the iceberg is located on AWS S3 or other oss, instead of 
hdfs?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
##########
@@ -73,28 +95,88 @@ public String getIcebergDb() {
         return icebergDb;
     }
 
-    public void setIcebergDb(String icebergDb) {
-        this.icebergDb = icebergDb;
-    }
-
     public String getIcebergTbl() {
         return icebergTbl;
     }
 
-    public void setIcebergTbl(String icebergTbl) {
-        this.icebergTbl = icebergTbl;
-    }
-
     public Map<String, String> getIcebergProperties() {
         return icebergProperties;
     }
 
-    public void setIcebergProperties(Map<String, String> icebergProperties) {
-        this.icebergProperties = icebergProperties;
+    public String getLocation() throws UserException {
+        if (Strings.isNullOrEmpty(location)) {
+            try {
+                getTable();
+            } catch (Exception e) {
+                throw new UserException("Failed to get table: " + name + 
",error: " + e.getMessage());
+            }
+            location = icebergTable.location();
+        }
+        return location;
+    }
+
+    public String getFileFormat() throws UserException {
+        if (Strings.isNullOrEmpty(fileFormat)) {
+            try {
+                getTable();
+            } catch (Exception e) {
+                throw new UserException("Failed to get table: " + name + 
",error: " + e.getMessage());
+            }
+            fileFormat = 
icebergTable.properties().get(TableProperties.DEFAULT_FILE_FORMAT);
+        }
+        return fileFormat;
     }
 
-    public org.apache.iceberg.Table getIcebergTable() {
-        return icebergTable;
+    // get the iceberg table instance, if table is not loaded, load it.
+    private org.apache.iceberg.Table getTable() throws Exception {
+        if (isLoaded.get()) {
+            Preconditions.checkNotNull(icebergTable);
+            return icebergTable;
+        }
+        synchronized (loadLock) {
+            if (icebergTable != null) {
+                return icebergTable;
+            }
+
+            IcebergProperty icebergProperty = getIcebergProperty();
+            IcebergCatalog icebergCatalog = 
IcebergCatalogMgr.getCatalog(icebergProperty);
+            try {
+                this.icebergTable = 
icebergCatalog.loadTable(TableIdentifier.of(icebergDb, icebergTbl));
+                LOG.info("finished to load table: {}", name);
+            } catch (Exception e) {
+                LOG.warn("failed to load table {} from {}", name, 
icebergProperty.getHiveMetastoreUris(), e);

Review comment:
       ```suggestion
                   LOG.warn("failed to load iceberg table {} from {}", name, 
icebergProperty.getHiveMetastoreUris(), e);
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
##########
@@ -17,8 +17,8 @@
 
 package org.apache.doris.qe;
 
-import avro.shaded.com.google.common.collect.Maps;
-import avro.shaded.com.google.common.collect.Sets;
+import com.google.common.collect.Maps;

Review comment:
       pay attention to import order

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
##########
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.IcebergProperty;
+import org.apache.doris.catalog.IcebergTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TExplainLevel;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+public class IcebergScanNode extends BrokerScanNode {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergScanNode.class);
+
+    private IcebergTable icebergTable;
+    private final List<Expression> icebergPredicates = new ArrayList<>();
+    private String hdfsUri;
+
+    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
+                           List<List<TBrokerFileStatus>> fileStatusesList, int 
filesAdded) {
+        super(id, desc, planNodeName, fileStatusesList, filesAdded);
+        icebergTable = (IcebergTable) desc.getTable();
+    }
+
+    @Override
+    public void init(Analyzer analyzer) throws UserException {
+        super.init(analyzer);
+    }
+
+    @Override
+    protected void initFileGroup() throws UserException {
+        fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable));
+        brokerDesc = new BrokerDesc("IcebergTableDesc", 
StorageBackend.StorageType.HDFS, icebergTable.getIcebergProperties());
+        targetTable = icebergTable;
+    }
+
+    @Override

Review comment:
       Add a example in comment about the format for returned url

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
##########
@@ -73,28 +95,88 @@ public String getIcebergDb() {
         return icebergDb;
     }
 
-    public void setIcebergDb(String icebergDb) {
-        this.icebergDb = icebergDb;
-    }
-
     public String getIcebergTbl() {
         return icebergTbl;
     }
 
-    public void setIcebergTbl(String icebergTbl) {
-        this.icebergTbl = icebergTbl;
-    }
-
     public Map<String, String> getIcebergProperties() {
         return icebergProperties;
     }
 
-    public void setIcebergProperties(Map<String, String> icebergProperties) {
-        this.icebergProperties = icebergProperties;
+    public String getLocation() throws UserException {
+        if (Strings.isNullOrEmpty(location)) {
+            try {
+                getTable();
+            } catch (Exception e) {
+                throw new UserException("Failed to get table: " + name + 
",error: " + e.getMessage());
+            }
+            location = icebergTable.location();
+        }
+        return location;
+    }
+
+    public String getFileFormat() throws UserException {
+        if (Strings.isNullOrEmpty(fileFormat)) {
+            try {
+                getTable();
+            } catch (Exception e) {
+                throw new UserException("Failed to get table: " + name + 
",error: " + e.getMessage());
+            }
+            fileFormat = 
icebergTable.properties().get(TableProperties.DEFAULT_FILE_FORMAT);
+        }
+        return fileFormat;
     }
 
-    public org.apache.iceberg.Table getIcebergTable() {
-        return icebergTable;
+    // get the iceberg table instance, if table is not loaded, load it.
+    private org.apache.iceberg.Table getTable() throws Exception {
+        if (isLoaded.get()) {
+            Preconditions.checkNotNull(icebergTable);
+            return icebergTable;
+        }
+        synchronized (loadLock) {
+            if (icebergTable != null) {
+                return icebergTable;
+            }
+
+            IcebergProperty icebergProperty = getIcebergProperty();
+            IcebergCatalog icebergCatalog = 
IcebergCatalogMgr.getCatalog(icebergProperty);
+            try {
+                this.icebergTable = 
icebergCatalog.loadTable(TableIdentifier.of(icebergDb, icebergTbl));
+                LOG.info("finished to load table: {}", name);

Review comment:
       ```suggestion
                   LOG.info("finished to load iceberg table: {}", name);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to