This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit a8772f016cb0cc5096d28ac3e2b7ff799c24ec15 Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Sun Feb 5 13:15:28 2023 +0800 [fix](iceberg) fix iceberg catalog (#16372) 1. Fix iceberg catalog access s3 2. Fix iceberg catalog partition table query 3. Fix persistence --- be/src/vec/exec/format/generic_reader.h | 2 +- be/src/vec/exec/format/table/iceberg_reader.cpp | 4 ++ be/src/vec/exec/format/table/iceberg_reader.h | 2 + be/src/vec/exec/scan/scanner_scheduler.cpp | 4 +- .../datasource/iceberg/IcebergExternalCatalog.java | 44 ++++++++++++++-------- .../iceberg/IcebergExternalCatalogFactory.java | 4 +- .../iceberg/IcebergHMSExternalCatalog.java | 6 +-- .../iceberg/IcebergRestExternalCatalog.java | 29 ++++++++++---- .../org/apache/doris/persist/gson/GsonUtils.java | 2 + 9 files changed, 66 insertions(+), 31 deletions(-) diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 9f4cfd00ee..5abf064813 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -47,7 +47,7 @@ public: /// If the underlying FileReader has filled the partition&missing columns, /// The FileScanner does not need to fill - bool fill_all_columns() const { return _fill_all_columns; } + virtual bool fill_all_columns() const { return _fill_all_columns; } /// Tell the underlying FileReader the partition&missing columns, /// and the FileReader determine to fill columns or not. diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index a3302aeb09..dcdfa06b12 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -112,6 +112,10 @@ Status IcebergTableReader::set_fill_columns( return _file_format_reader->set_fill_columns(partition_columns, missing_columns); } +bool IcebergTableReader::fill_all_columns() const { + return _file_format_reader->fill_all_columns(); +}; + Status IcebergTableReader::get_columns( std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) { diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index c97577ad05..7982869316 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -49,6 +49,8 @@ public: partition_columns, const std::unordered_map<std::string, VExprContext*>& missing_columns) override; + bool fill_all_columns() const override; + Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index c645cbc62d..d62909b5bd 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -242,7 +242,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext auto block = ctx->get_free_block(&get_free_block); status = scanner->get_block(state, block, &eos); - VLOG_ROW << "VOlapScanNode input rows: " << block->rows() << ", eos: " << eos; + VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << eos; // The VFileScanner for external table may try to open not exist files, // Because FE file cache for external table may out of date. // So, NOT_FOUND for VFileScanner is not a fail case. @@ -250,7 +250,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext if (!status.ok() && (typeid(*scanner) != typeid(doris::vectorized::VFileScanner) || (typeid(*scanner) == typeid(doris::vectorized::VFileScanner) && !status.is_not_found()))) { - LOG(WARNING) << "Scan thread read VOlapScanner failed: " << status.to_string(); + LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string(); // Add block ptr in blocks, prevent mem leak in read failed blocks.push_back(block); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 53e5b57459..d0335b8493 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -35,6 +35,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -54,13 +55,12 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type"; public static final String ICEBERG_REST = "rest"; public static final String ICEBERG_HMS = "hms"; - protected final String icebergCatalogType; + protected String icebergCatalogType; protected Catalog catalog; protected SupportsNamespaces nsCatalog; - public IcebergExternalCatalog(long catalogId, String name, String type) { + public IcebergExternalCatalog(long catalogId, String name) { super(catalogId, name); - this.icebergCatalogType = type; } @Override @@ -152,23 +152,34 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { } } + public Catalog getCatalog() { + makeSureInitialized(); + return catalog; + } + + public SupportsNamespaces getNsCatalog() { + makeSureInitialized(); + return nsCatalog; + } + public String getIcebergCatalogType() { + makeSureInitialized(); return icebergCatalogType; } protected List<String> listDatabaseNames() { return nsCatalog.listNamespaces().stream() - .map(e -> { - String dbName = e.toString(); - try { - FeNameFormat.checkDbName(dbName); - } catch (AnalysisException ex) { - Util.logAndThrowRuntimeException(LOG, - String.format("Not a supported namespace name format: %s", dbName), ex); - } - return dbName; - }) - .collect(Collectors.toList()); + .map(e -> { + String dbName = e.toString(); + try { + FeNameFormat.checkDbName(dbName); + } catch (AnalysisException ex) { + Util.logAndThrowRuntimeException(LOG, + String.format("Not a supported namespace name format: %s", dbName), ex); + } + return dbName; + }) + .collect(Collectors.toList()); } @Override @@ -180,12 +191,13 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { @Override public List<Column> getSchema(String dbName, String tblName) { makeSureInitialized(); - List<Types.NestedField> columns = getIcebergTable(dbName, tblName).schema().columns(); + Schema schema = getIcebergTable(dbName, tblName).schema(); + List<Types.NestedField> columns = schema.columns(); List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); for (Types.NestedField field : columns) { tmpSchema.add(new Column(field.name(), icebergTypeToDorisType(field.type()), true, null, - true, null, field.doc(), true, null, -1)); + true, field.doc(), true, schema.caseInsensitiveFindField(field.name()).fieldId())); } return tmpSchema; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java index 62ad0c8729..4f97c3cd59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java @@ -32,9 +32,9 @@ public class IcebergExternalCatalogFactory { } switch (catalogType) { case IcebergExternalCatalog.ICEBERG_REST: - return new IcebergRestExternalCatalog(catalogId, name, resource, catalogType, props); + return new IcebergRestExternalCatalog(catalogId, name, resource, props); case IcebergExternalCatalog.ICEBERG_HMS: - return new IcebergHMSExternalCatalog(catalogId, name, resource, catalogType, props); + return new IcebergHMSExternalCatalog(catalogId, name, resource, props); default: throw new DdlException("Unknown " + IcebergExternalCatalog.ICEBERG_CATALOG_TYPE + " value: " + catalogType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java index f969ae7085..97748c608b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java @@ -28,14 +28,14 @@ import java.util.Map; public class IcebergHMSExternalCatalog extends IcebergExternalCatalog { - public IcebergHMSExternalCatalog(long catalogId, String name, String resource, String catalogType, - Map<String, String> props) { - super(catalogId, name, catalogType); + public IcebergHMSExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) { + super(catalogId, name); catalogProperty = new CatalogProperty(resource, props); } @Override protected void initLocalObjectsImpl() { + icebergCatalogType = ICEBERG_HMS; HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); hiveCatalog.setConf(getConfiguration()); // initialize hive catalog diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java index f3f240b661..c0343f244c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.catalog.S3Resource; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider; @@ -30,24 +31,38 @@ import java.util.Map; public class IcebergRestExternalCatalog extends IcebergExternalCatalog { - public IcebergRestExternalCatalog(long catalogId, String name, String resource, String catalogType, - Map<String, String> props) { - super(catalogId, name, catalogType); + public IcebergRestExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) { + super(catalogId, name); catalogProperty = new CatalogProperty(resource, props); } @Override protected void initLocalObjectsImpl() { + icebergCatalogType = ICEBERG_REST; Map<String, String> restProperties = new HashMap<>(); String restUri = catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, ""); restProperties.put(CatalogProperties.URI, restUri); RESTCatalog restCatalog = new RESTCatalog(); - String credentials = catalogProperty.getProperties() - .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, DataLakeAWSCredentialsProvider.class.getName()); - Configuration conf = getConfiguration(); - conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials); + Configuration conf = replaceS3Properties(getConfiguration()); restCatalog.setConf(conf); restCatalog.initialize(icebergCatalogType, restProperties); catalog = restCatalog; } + + private Configuration replaceS3Properties(Configuration conf) { + Map<String, String> catalogProperties = catalogProperty.getProperties(); + String credentials = catalogProperties + .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, DataLakeAWSCredentialsProvider.class.getName()); + conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials); + String usePahStyle = catalogProperties.getOrDefault(S3Resource.USE_PATH_STYLE, "true"); + // Set path style + conf.set(S3Resource.USE_PATH_STYLE, usePahStyle); + conf.set(Constants.PATH_STYLE_ACCESS, usePahStyle); + // Get AWS client retry limit + conf.set(Constants.RETRY_LIMIT, catalogProperties.getOrDefault(Constants.RETRY_LIMIT, "1")); + conf.set(Constants.RETRY_THROTTLE_LIMIT, catalogProperties.getOrDefault(Constants.RETRY_THROTTLE_LIMIT, "1")); + conf.set(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT, + catalogProperties.getOrDefault(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT, "1")); + return conf; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index f613d6f986..fef1ed14dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -52,6 +52,7 @@ import org.apache.doris.datasource.EsExternalCatalog; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.JdbcExternalCatalog; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog; import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; @@ -172,6 +173,7 @@ public class GsonUtils { .registerSubtype(HMSExternalCatalog.class, HMSExternalCatalog.class.getSimpleName()) .registerSubtype(EsExternalCatalog.class, EsExternalCatalog.class.getSimpleName()) .registerSubtype(JdbcExternalCatalog.class, JdbcExternalCatalog.class.getSimpleName()) + .registerSubtype(IcebergExternalCatalog.class, IcebergExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergHMSExternalCatalog.class, IcebergHMSExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org