This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a8772f016cb0cc5096d28ac3e2b7ff799c24ec15
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Sun Feb 5 13:15:28 2023 +0800

    [fix](iceberg) fix iceberg catalog (#16372)
    
    1. Fix iceberg catalog access s3
    2. Fix iceberg catalog partition table query
    3. Fix persistence
---
 be/src/vec/exec/format/generic_reader.h            |  2 +-
 be/src/vec/exec/format/table/iceberg_reader.cpp    |  4 ++
 be/src/vec/exec/format/table/iceberg_reader.h      |  2 +
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  4 +-
 .../datasource/iceberg/IcebergExternalCatalog.java | 44 ++++++++++++++--------
 .../iceberg/IcebergExternalCatalogFactory.java     |  4 +-
 .../iceberg/IcebergHMSExternalCatalog.java         |  6 +--
 .../iceberg/IcebergRestExternalCatalog.java        | 29 ++++++++++----
 .../org/apache/doris/persist/gson/GsonUtils.java   |  2 +
 9 files changed, 66 insertions(+), 31 deletions(-)

diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index 9f4cfd00ee..5abf064813 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -47,7 +47,7 @@ public:
 
     /// If the underlying FileReader has filled the partition&missing columns,
     /// The FileScanner does not need to fill
-    bool fill_all_columns() const { return _fill_all_columns; }
+    virtual bool fill_all_columns() const { return _fill_all_columns; }
 
     /// Tell the underlying FileReader the partition&missing columns,
     /// and the FileReader determine to fill columns or not.
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index a3302aeb09..dcdfa06b12 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -112,6 +112,10 @@ Status IcebergTableReader::set_fill_columns(
     return _file_format_reader->set_fill_columns(partition_columns, 
missing_columns);
 }
 
+bool IcebergTableReader::fill_all_columns() const {
+    return _file_format_reader->fill_all_columns();
+};
+
 Status IcebergTableReader::get_columns(
         std::unordered_map<std::string, TypeDescriptor>* name_to_type,
         std::unordered_set<std::string>* missing_cols) {
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index c97577ad05..7982869316 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -49,6 +49,8 @@ public:
                     partition_columns,
             const std::unordered_map<std::string, VExprContext*>& 
missing_columns) override;
 
+    bool fill_all_columns() const override;
+
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index c645cbc62d..d62909b5bd 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -242,7 +242,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
 
         auto block = ctx->get_free_block(&get_free_block);
         status = scanner->get_block(state, block, &eos);
-        VLOG_ROW << "VOlapScanNode input rows: " << block->rows() << ", eos: " 
<< eos;
+        VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << 
eos;
         // The VFileScanner for external table may try to open not exist files,
         // Because FE file cache for external table may out of date.
         // So, NOT_FOUND for VFileScanner is not a fail case.
@@ -250,7 +250,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
         if (!status.ok() && (typeid(*scanner) != 
typeid(doris::vectorized::VFileScanner) ||
                              (typeid(*scanner) == 
typeid(doris::vectorized::VFileScanner) &&
                               !status.is_not_found()))) {
-            LOG(WARNING) << "Scan thread read VOlapScanner failed: " << 
status.to_string();
+            LOG(WARNING) << "Scan thread read VScanner failed: " << 
status.to_string();
             // Add block ptr in blocks, prevent mem leak in read failed
             blocks.push_back(block);
             break;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 53e5b57459..d0335b8493 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -54,13 +55,12 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type";
     public static final String ICEBERG_REST = "rest";
     public static final String ICEBERG_HMS = "hms";
-    protected final String icebergCatalogType;
+    protected String icebergCatalogType;
     protected Catalog catalog;
     protected SupportsNamespaces nsCatalog;
 
-    public IcebergExternalCatalog(long catalogId, String name, String type) {
+    public IcebergExternalCatalog(long catalogId, String name) {
         super(catalogId, name);
-        this.icebergCatalogType = type;
     }
 
     @Override
@@ -152,23 +152,34 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
         }
     }
 
+    public Catalog getCatalog() {
+        makeSureInitialized();
+        return catalog;
+    }
+
+    public SupportsNamespaces getNsCatalog() {
+        makeSureInitialized();
+        return nsCatalog;
+    }
+
     public String getIcebergCatalogType() {
+        makeSureInitialized();
         return icebergCatalogType;
     }
 
     protected List<String> listDatabaseNames() {
         return nsCatalog.listNamespaces().stream()
-            .map(e -> {
-                String dbName = e.toString();
-                try {
-                    FeNameFormat.checkDbName(dbName);
-                } catch (AnalysisException ex) {
-                    Util.logAndThrowRuntimeException(LOG,
-                            String.format("Not a supported namespace name 
format: %s", dbName), ex);
-                }
-                return dbName;
-            })
-            .collect(Collectors.toList());
+                .map(e -> {
+                    String dbName = e.toString();
+                    try {
+                        FeNameFormat.checkDbName(dbName);
+                    } catch (AnalysisException ex) {
+                        Util.logAndThrowRuntimeException(LOG,
+                                String.format("Not a supported namespace name 
format: %s", dbName), ex);
+                    }
+                    return dbName;
+                })
+                .collect(Collectors.toList());
     }
 
     @Override
@@ -180,12 +191,13 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     @Override
     public List<Column> getSchema(String dbName, String tblName) {
         makeSureInitialized();
-        List<Types.NestedField> columns = getIcebergTable(dbName, 
tblName).schema().columns();
+        Schema schema = getIcebergTable(dbName, tblName).schema();
+        List<Types.NestedField> columns = schema.columns();
         List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
         for (Types.NestedField field : columns) {
             tmpSchema.add(new Column(field.name(),
                     icebergTypeToDorisType(field.type()), true, null,
-                    true, null, field.doc(), true, null, -1));
+                    true, field.doc(), true, 
schema.caseInsensitiveFindField(field.name()).fieldId()));
         }
         return tmpSchema;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
index 62ad0c8729..4f97c3cd59 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
@@ -32,9 +32,9 @@ public class IcebergExternalCatalogFactory {
         }
         switch (catalogType) {
             case IcebergExternalCatalog.ICEBERG_REST:
-                return new IcebergRestExternalCatalog(catalogId, name, 
resource, catalogType, props);
+                return new IcebergRestExternalCatalog(catalogId, name, 
resource, props);
             case IcebergExternalCatalog.ICEBERG_HMS:
-                return new IcebergHMSExternalCatalog(catalogId, name, 
resource, catalogType, props);
+                return new IcebergHMSExternalCatalog(catalogId, name, 
resource, props);
             default:
                 throw new DdlException("Unknown " + 
IcebergExternalCatalog.ICEBERG_CATALOG_TYPE
                     + " value: " + catalogType);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
index f969ae7085..97748c608b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
@@ -28,14 +28,14 @@ import java.util.Map;
 
 public class IcebergHMSExternalCatalog extends IcebergExternalCatalog {
 
-    public IcebergHMSExternalCatalog(long catalogId, String name, String 
resource, String catalogType,
-                                     Map<String, String> props) {
-        super(catalogId, name, catalogType);
+    public IcebergHMSExternalCatalog(long catalogId, String name, String 
resource, Map<String, String> props) {
+        super(catalogId, name);
         catalogProperty = new CatalogProperty(resource, props);
     }
 
     @Override
     protected void initLocalObjectsImpl() {
+        icebergCatalogType = ICEBERG_HMS;
         HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
         hiveCatalog.setConf(getConfiguration());
         // initialize hive catalog
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
index f3f240b661..c0343f244c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.doris.catalog.S3Resource;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider;
 
@@ -30,24 +31,38 @@ import java.util.Map;
 
 public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
 
-    public  IcebergRestExternalCatalog(long catalogId, String name, String 
resource, String catalogType,
-                                       Map<String, String> props) {
-        super(catalogId, name, catalogType);
+    public  IcebergRestExternalCatalog(long catalogId, String name, String 
resource, Map<String, String> props) {
+        super(catalogId, name);
         catalogProperty = new CatalogProperty(resource, props);
     }
 
     @Override
     protected void initLocalObjectsImpl() {
+        icebergCatalogType = ICEBERG_REST;
         Map<String, String> restProperties = new HashMap<>();
         String restUri = 
catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, "");
         restProperties.put(CatalogProperties.URI, restUri);
         RESTCatalog restCatalog = new RESTCatalog();
-        String credentials = catalogProperty.getProperties()
-                .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, 
DataLakeAWSCredentialsProvider.class.getName());
-        Configuration conf = getConfiguration();
-        conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials);
+        Configuration conf = replaceS3Properties(getConfiguration());
         restCatalog.setConf(conf);
         restCatalog.initialize(icebergCatalogType, restProperties);
         catalog = restCatalog;
     }
+
+    private Configuration replaceS3Properties(Configuration conf) {
+        Map<String, String> catalogProperties = 
catalogProperty.getProperties();
+        String credentials = catalogProperties
+                .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, 
DataLakeAWSCredentialsProvider.class.getName());
+        conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials);
+        String usePahStyle = 
catalogProperties.getOrDefault(S3Resource.USE_PATH_STYLE, "true");
+        // Set path style
+        conf.set(S3Resource.USE_PATH_STYLE, usePahStyle);
+        conf.set(Constants.PATH_STYLE_ACCESS, usePahStyle);
+        // Get AWS client retry limit
+        conf.set(Constants.RETRY_LIMIT, 
catalogProperties.getOrDefault(Constants.RETRY_LIMIT, "1"));
+        conf.set(Constants.RETRY_THROTTLE_LIMIT, 
catalogProperties.getOrDefault(Constants.RETRY_THROTTLE_LIMIT, "1"));
+        conf.set(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT,
+                
catalogProperties.getOrDefault(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT, "1"));
+        return conf;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index f613d6f986..fef1ed14dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -52,6 +52,7 @@ import org.apache.doris.datasource.EsExternalCatalog;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.JdbcExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
 import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
@@ -172,6 +173,7 @@ public class GsonUtils {
             .registerSubtype(HMSExternalCatalog.class, 
HMSExternalCatalog.class.getSimpleName())
             .registerSubtype(EsExternalCatalog.class, 
EsExternalCatalog.class.getSimpleName())
             .registerSubtype(JdbcExternalCatalog.class, 
JdbcExternalCatalog.class.getSimpleName())
+            .registerSubtype(IcebergExternalCatalog.class, 
IcebergExternalCatalog.class.getSimpleName())
             .registerSubtype(IcebergHMSExternalCatalog.class, 
IcebergHMSExternalCatalog.class.getSimpleName())
             .registerSubtype(IcebergRestExternalCatalog.class, 
IcebergRestExternalCatalog.class.getSimpleName());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to