This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 40d9e19e1d [feature-wip](multi-catalog) support iceberg union catalog, and add h… (#16082) 40d9e19e1d is described below commit 40d9e19e1d1efe12a0853c99177d8e672c9993c4 Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Wed Feb 1 22:59:42 2023 +0800 [feature-wip](multi-catalog) support iceberg union catalog, and add h… (#16082) support iceberg unified catalog framework, and add hms and rest catalog for the framework --- .../java/org/apache/doris/catalog/TableIf.java | 6 +- .../catalog/external/IcebergExternalDatabase.java | 180 ++++++++++++++++++ .../catalog/external/IcebergExternalTable.java | 66 +++++++ .../apache/doris/datasource/CatalogFactory.java | 4 + .../apache/doris/datasource/ExternalCatalog.java | 9 + .../apache/doris/datasource/InitCatalogLog.java | 3 +- .../DataLakeAWSCredentialsProvider.java | 59 ++++++ .../datasource/iceberg/IcebergExternalCatalog.java | 210 +++++++++++++++++++++ .../iceberg/IcebergExternalCatalogFactory.java | 43 +++++ .../iceberg/IcebergHMSExternalCatalog.java | 50 +++++ .../iceberg/IcebergRestExternalCatalog.java | 53 ++++++ .../apache/doris/planner/SingleNodePlanner.java | 1 + .../planner/external/ExternalFileScanNode.java | 38 +++- .../doris/planner/external/QueryScanProvider.java | 2 + .../planner/external/iceberg/IcebergApiSource.java | 119 ++++++++++++ .../{ => iceberg}/IcebergDeleteFileFilter.java | 11 +- .../planner/external/iceberg/IcebergHMSSource.java | 85 +++++++++ .../{ => iceberg}/IcebergScanProvider.java | 101 +++++++--- .../planner/external/iceberg/IcebergSource.java | 44 +++++ .../external/{ => iceberg}/IcebergSplit.java | 3 +- 20 files changed, 1054 insertions(+), 33 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index df013b7cc7..7a71d4f4e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -134,7 +134,8 @@ public interface TableIf { */ enum TableType { MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, JDBC, - TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE; + TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE, + ICEBERG_EXTERNAL_TABLE; public String toEngineName() { switch (this) { @@ -167,6 +168,8 @@ public interface TableIf { return "hms"; case ES_EXTERNAL_TABLE: return "es"; + case ICEBERG_EXTERNAL_TABLE: + return "iceberg"; default: return null; } @@ -193,6 +196,7 @@ public interface TableIf { case TABLE_VALUED_FUNCTION: case HMS_EXTERNAL_TABLE: case ES_EXTERNAL_TABLE: + case ICEBERG_EXTERNAL_TABLE: return "EXTERNAL TABLE"; default: return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java new file mode 100644 index 0000000000..9b110c9dcf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog.external; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InitDatabaseLog; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.persist.gson.GsonPostProcessable; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +public class IcebergExternalDatabase extends ExternalDatabase<IcebergExternalTable> implements GsonPostProcessable { + + private static final Logger LOG = LogManager.getLogger(IcebergExternalDatabase.class); + // Cache of table name to table id. + private Map<String, Long> tableNameToId = Maps.newConcurrentMap(); + @SerializedName(value = "idToTbl") + private Map<Long, IcebergExternalTable> idToTbl = Maps.newConcurrentMap(); + + public IcebergExternalDatabase(ExternalCatalog extCatalog, Long id, String name) { + super(extCatalog, id, name); + } + + public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { + Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap(); + Map<Long, IcebergExternalTable> tmpIdToTbl = Maps.newConcurrentMap(); + for (int i = 0; i < log.getRefreshCount(); i++) { + IcebergExternalTable table = getTableForReplay(log.getRefreshTableIds().get(i)); + tmpTableNameToId.put(table.getName(), table.getId()); + tmpIdToTbl.put(table.getId(), table); + } + for (int i = 0; i < log.getCreateCount(); i++) { + IcebergExternalTable table = new IcebergExternalTable(log.getCreateTableIds().get(i), + log.getCreateTableNames().get(i), name, (IcebergExternalCatalog) catalog); + tmpTableNameToId.put(table.getName(), table.getId()); + tmpIdToTbl.put(table.getId(), table); + } + tableNameToId = tmpTableNameToId; + idToTbl = tmpIdToTbl; + initialized = true; + } + + public void setTableExtCatalog(ExternalCatalog extCatalog) { + for (IcebergExternalTable table : idToTbl.values()) { + table.setCatalog(extCatalog); + } + } + + @Override + protected void init() { + InitDatabaseLog initDatabaseLog = new InitDatabaseLog(); + initDatabaseLog.setType(InitDatabaseLog.Type.HMS); + initDatabaseLog.setCatalogId(extCatalog.getId()); + initDatabaseLog.setDbId(id); + List<String> tableNames = extCatalog.listTableNames(null, name); + if (tableNames != null) { + Map<String, Long> tmpTableNameToId = Maps.newConcurrentMap(); + Map<Long, IcebergExternalTable> tmpIdToTbl = Maps.newHashMap(); + for (String tableName : tableNames) { + long tblId; + if (tableNameToId != null && tableNameToId.containsKey(tableName)) { + tblId = tableNameToId.get(tableName); + tmpTableNameToId.put(tableName, tblId); + IcebergExternalTable table = idToTbl.get(tblId); + tmpIdToTbl.put(tblId, table); + initDatabaseLog.addRefreshTable(tblId); + } else { + tblId = Env.getCurrentEnv().getNextId(); + tmpTableNameToId.put(tableName, tblId); + IcebergExternalTable table = new IcebergExternalTable(tblId, tableName, name, + (IcebergExternalCatalog) extCatalog); + tmpIdToTbl.put(tblId, table); + initDatabaseLog.addCreateTable(tblId, tableName); + } + } + tableNameToId = tmpTableNameToId; + idToTbl = tmpIdToTbl; + } + initialized = true; + Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); + } + + @Override + public List<IcebergExternalTable> getTables() { + makeSureInitialized(); + return Lists.newArrayList(idToTbl.values()); + } + + @Override + public List<IcebergExternalTable> getTablesOnIdOrder() { + // Sort the name instead, because the id may change. + return getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList()); + } + + @Override + public Set<String> getTableNamesWithLock() { + makeSureInitialized(); + return Sets.newHashSet(tableNameToId.keySet()); + } + + @Override + public IcebergExternalTable getTableNullable(String tableName) { + makeSureInitialized(); + if (!tableNameToId.containsKey(tableName)) { + return null; + } + return idToTbl.get(tableNameToId.get(tableName)); + } + + @Override + public IcebergExternalTable getTableNullable(long tableId) { + makeSureInitialized(); + return idToTbl.get(tableId); + } + + @Override + public IcebergExternalTable getTableForReplay(long tableId) { + return idToTbl.get(tableId); + } + + @Override + public void gsonPostProcess() throws IOException { + tableNameToId = Maps.newConcurrentMap(); + for (IcebergExternalTable tbl : idToTbl.values()) { + tableNameToId.put(tbl.getName(), tbl.getId()); + } + rwLock = new ReentrantReadWriteLock(true); + } + + @Override + public void dropTable(String tableName) { + LOG.debug("drop table [{}]", tableName); + makeSureInitialized(); + Long tableId = tableNameToId.remove(tableName); + if (tableId == null) { + LOG.warn("drop table [{}] failed", tableName); + } + idToTbl.remove(tableId); + } + + @Override + public void createTable(String tableName, long tableId) { + LOG.debug("create table [{}]", tableName); + makeSureInitialized(); + tableNameToId.put(tableName, tableId); + IcebergExternalTable table = new IcebergExternalTable(tableId, tableName, name, + (IcebergExternalCatalog) extCatalog); + idToTbl.put(tableId, table); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java new file mode 100644 index 0000000000..389ba8dc55 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog.external; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.thrift.THiveTable; +import org.apache.doris.thrift.TIcebergTable; +import org.apache.doris.thrift.TTableDescriptor; +import org.apache.doris.thrift.TTableType; + +import java.util.HashMap; +import java.util.List; + +public class IcebergExternalTable extends ExternalTable { + + IcebergExternalCatalog icebergCatalog; + + public IcebergExternalTable(long id, String name, String dbName, IcebergExternalCatalog catalog) { + super(id, name, catalog, dbName, TableType.ICEBERG_EXTERNAL_TABLE); + icebergCatalog = catalog; + } + + public String getIcebergCatalogType() { + return icebergCatalog.getIcebergCatalogType(); + } + + protected synchronized void makeSureInitialized() { + if (!objectCreated) { + objectCreated = true; + } + } + + @Override + public TTableDescriptor toThrift() { + List<Column> schema = getFullSchema(); + if (icebergCatalog.getIcebergCatalogType().equals("hms")) { + THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>()); + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0, + getName(), dbName); + tTableDescriptor.setHiveTable(tHiveTable); + return tTableDescriptor; + } else { + TIcebergTable icebergTable = new TIcebergTable(dbName, name, new HashMap<>()); + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE, + schema.size(), 0, getName(), dbName); + tTableDescriptor.setIcebergTable(icebergTable); + return tTableDescriptor; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java index 87c1212c1c..3f9be036ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.StatementBase; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Resource; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory; import org.apache.parquet.Strings; @@ -102,6 +103,9 @@ public class CatalogFactory { case "jdbc": catalog = new JdbcExternalCatalog(catalogId, name, resource, props); break; + case "iceberg": + catalog = IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props); + break; default: throw new DdlException("Unknown catalog type: " + catalogType); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 3a4a711e1c..200999c6db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.external.EsExternalDatabase; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.HMSExternalDatabase; +import org.apache.doris.catalog.external.IcebergExternalDatabase; import org.apache.doris.catalog.external.JdbcExternalDatabase; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.io.Text; @@ -280,6 +281,14 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr tmpIdToDb.put(db.getId(), db); } break; + case ICEBERG: + for (int i = 0; i < log.getCreateCount(); i++) { + IcebergExternalDatabase db = new IcebergExternalDatabase( + this, log.getCreateDbIds().get(i), log.getCreateDbNames().get(i)); + tmpDbNameToId.put(db.getFullName(), db.getId()); + tmpIdToDb.put(db.getId(), db); + } + break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java index c7ed1696b2..524e2d9d3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java @@ -32,10 +32,11 @@ import java.util.List; @Data public class InitCatalogLog implements Writable { - enum Type { + public enum Type { HMS, ES, JDBC, + ICEBERG, UNKNOWN; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/DataLakeAWSCredentialsProvider.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/DataLakeAWSCredentialsProvider.java new file mode 100644 index 0000000000..9901b9c668 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/credentials/DataLakeAWSCredentialsProvider.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.credentials; + +import com.amazonaws.SdkClientException; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.BasicSessionCredentials; +import com.amazonaws.util.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.Constants; + +public class DataLakeAWSCredentialsProvider implements AWSCredentialsProvider { + + private final Configuration conf; + + public DataLakeAWSCredentialsProvider(Configuration conf) { + this.conf = conf; + } + + @Override + public AWSCredentials getCredentials() { + String accessKey = StringUtils.trim(conf.get(Constants.ACCESS_KEY)); + String secretKey = StringUtils.trim(conf.get(Constants.SECRET_KEY)); + String sessionToken = StringUtils.trim(conf.get(Constants.SESSION_TOKEN)); + if (!StringUtils.isNullOrEmpty(accessKey) && !StringUtils.isNullOrEmpty(secretKey)) { + return (StringUtils.isNullOrEmpty(sessionToken) ? new BasicAWSCredentials(accessKey, + secretKey) : new BasicSessionCredentials(accessKey, secretKey, sessionToken)); + } else { + throw new SdkClientException( + "Unable to load AWS credentials from hive conf (fs.s3a.access.key and fs.s3a.secret.key)"); + } + } + + @Override + public void refresh() { + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java new file mode 100644 index 0000000000..53e5b57459 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; +import org.apache.doris.catalog.external.ExternalDatabase; +import org.apache.doris.catalog.external.IcebergExternalDatabase; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InitCatalogLog; +import org.apache.doris.datasource.SessionContext; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public abstract class IcebergExternalCatalog extends ExternalCatalog { + + private static final Logger LOG = LogManager.getLogger(IcebergExternalCatalog.class); + public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type"; + public static final String ICEBERG_REST = "rest"; + public static final String ICEBERG_HMS = "hms"; + protected final String icebergCatalogType; + protected Catalog catalog; + protected SupportsNamespaces nsCatalog; + + public IcebergExternalCatalog(long catalogId, String name, String type) { + super(catalogId, name); + this.icebergCatalogType = type; + } + + @Override + protected void init() { + nsCatalog = (SupportsNamespaces) catalog; + Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap(); + Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap(); + InitCatalogLog initCatalogLog = new InitCatalogLog(); + initCatalogLog.setCatalogId(id); + initCatalogLog.setType(InitCatalogLog.Type.ICEBERG); + List<String> allDatabaseNames = listDatabaseNames(); + for (String dbName : allDatabaseNames) { + long dbId; + if (dbNameToId != null && dbNameToId.containsKey(dbName)) { + dbId = dbNameToId.get(dbName); + tmpDbNameToId.put(dbName, dbId); + ExternalDatabase db = idToDb.get(dbId); + db.setUnInitialized(invalidCacheInInit); + tmpIdToDb.put(dbId, db); + initCatalogLog.addRefreshDb(dbId); + } else { + dbId = Env.getCurrentEnv().getNextId(); + tmpDbNameToId.put(dbName, dbId); + IcebergExternalDatabase db = new IcebergExternalDatabase(this, dbId, dbName); + tmpIdToDb.put(dbId, db); + initCatalogLog.addCreateDb(dbId, dbName); + } + } + dbNameToId = tmpDbNameToId; + idToDb = tmpIdToDb; + Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog); + } + + protected Configuration getConfiguration() { + Configuration conf = new HdfsConfiguration(); + Map<String, String> catalogProperties = catalogProperty.getHadoopProperties(); + for (Map.Entry<String, String> entry : catalogProperties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + return conf; + } + + protected Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) { + if (type.isPrimitiveType()) { + return icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) type); + } + switch (type.typeId()) { + case LIST: + Types.ListType list = (Types.ListType) type; + return ArrayType.create(icebergTypeToDorisType(list.elementType()), true); + case MAP: + case STRUCT: + return Type.UNSUPPORTED; + default: + throw new IllegalArgumentException("Cannot transform unknown type: " + type); + } + } + + private Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) { + switch (primitive.typeId()) { + case BOOLEAN: + return Type.BOOLEAN; + case INTEGER: + return Type.INT; + case LONG: + return Type.BIGINT; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case STRING: + case BINARY: + case UUID: + return Type.STRING; + case FIXED: + Types.FixedType fixed = (Types.FixedType) primitive; + return ScalarType.createCharType(fixed.length()); + case DECIMAL: + Types.DecimalType decimal = (Types.DecimalType) primitive; + return ScalarType.createDecimalType(decimal.precision(), decimal.scale()); + case DATE: + return ScalarType.createDateV2Type(); + case TIMESTAMP: + return ScalarType.createDatetimeV2Type(0); + case TIME: + return Type.UNSUPPORTED; + default: + throw new IllegalArgumentException("Cannot transform unknown type: " + primitive); + } + } + + public String getIcebergCatalogType() { + return icebergCatalogType; + } + + protected List<String> listDatabaseNames() { + return nsCatalog.listNamespaces().stream() + .map(e -> { + String dbName = e.toString(); + try { + FeNameFormat.checkDbName(dbName); + } catch (AnalysisException ex) { + Util.logAndThrowRuntimeException(LOG, + String.format("Not a supported namespace name format: %s", dbName), ex); + } + return dbName; + }) + .collect(Collectors.toList()); + } + + @Override + public List<String> listDatabaseNames(SessionContext ctx) { + makeSureInitialized(); + return new ArrayList<>(dbNameToId.keySet()); + } + + @Override + public List<Column> getSchema(String dbName, String tblName) { + makeSureInitialized(); + List<Types.NestedField> columns = getIcebergTable(dbName, tblName).schema().columns(); + List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + for (Types.NestedField field : columns) { + tmpSchema.add(new Column(field.name(), + icebergTypeToDorisType(field.type()), true, null, + true, null, field.doc(), true, null, -1)); + } + return tmpSchema; + } + + @Override + public boolean tableExist(SessionContext ctx, String dbName, String tblName) { + makeSureInitialized(); + return catalog.tableExists(TableIdentifier.of(dbName, tblName)); + } + + @Override + public List<String> listTableNames(SessionContext ctx, String dbName) { + makeSureInitialized(); + List<TableIdentifier> tableIdentifiers = catalog.listTables(Namespace.of(dbName)); + return tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList()); + } + + public org.apache.iceberg.Table getIcebergTable(String dbName, String tblName) { + makeSureInitialized(); + return catalog.loadTable(TableIdentifier.of(dbName, tblName)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java new file mode 100644 index 0000000000..62ad0c8729 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.CatalogIf; + +import java.util.Map; + +public class IcebergExternalCatalogFactory { + + public static CatalogIf createCatalog(long catalogId, String name, String resource, Map<String, String> props) + throws DdlException { + String catalogType = props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE); + if (catalogType == null) { + throw new DdlException("Missing " + IcebergExternalCatalog.ICEBERG_CATALOG_TYPE + " property"); + } + switch (catalogType) { + case IcebergExternalCatalog.ICEBERG_REST: + return new IcebergRestExternalCatalog(catalogId, name, resource, catalogType, props); + case IcebergExternalCatalog.ICEBERG_HMS: + return new IcebergHMSExternalCatalog(catalogId, name, resource, catalogType, props); + default: + throw new DdlException("Unknown " + IcebergExternalCatalog.ICEBERG_CATALOG_TYPE + + " value: " + catalogType); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java new file mode 100644 index 0000000000..f969ae7085 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.HMSResource; +import org.apache.doris.datasource.CatalogProperty; + +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.hive.HiveCatalog; + +import java.util.HashMap; +import java.util.Map; + +public class IcebergHMSExternalCatalog extends IcebergExternalCatalog { + + public IcebergHMSExternalCatalog(long catalogId, String name, String resource, String catalogType, + Map<String, String> props) { + super(catalogId, name, catalogType); + catalogProperty = new CatalogProperty(resource, props); + } + + @Override + protected void initLocalObjectsImpl() { + HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); + hiveCatalog.setConf(getConfiguration()); + // initialize hive catalog + Map<String, String> catalogProperties = new HashMap<>(); + String metastoreUris = catalogProperty.getOrDefault(HMSResource.HIVE_METASTORE_URIS, ""); + + catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, metastoreUris); + catalogProperties.put(CatalogProperties.URI, metastoreUris); + hiveCatalog.initialize(icebergCatalogType, catalogProperties); + catalog = hiveCatalog; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java new file mode 100644 index 0000000000..f3f240b661 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.rest.RESTCatalog; + +import java.util.HashMap; +import java.util.Map; + +public class IcebergRestExternalCatalog extends IcebergExternalCatalog { + + public IcebergRestExternalCatalog(long catalogId, String name, String resource, String catalogType, + Map<String, String> props) { + super(catalogId, name, catalogType); + catalogProperty = new CatalogProperty(resource, props); + } + + @Override + protected void initLocalObjectsImpl() { + Map<String, String> restProperties = new HashMap<>(); + String restUri = catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, ""); + restProperties.put(CatalogProperties.URI, restUri); + RESTCatalog restCatalog = new RESTCatalog(); + String credentials = catalogProperty.getProperties() + .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, DataLakeAWSCredentialsProvider.class.getName()); + Configuration conf = getConfiguration(); + conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials); + restCatalog.setConf(conf); + restCatalog.initialize(icebergCatalogType, restProperties); + catalog = restCatalog; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 199e0c623a..ae286c7974 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1940,6 +1940,7 @@ public class SingleNodePlanner { scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId()); break; case HMS_EXTERNAL_TABLE: + case ICEBERG_EXTERNAL_TABLE: scanNode = new ExternalFileScanNode(ctx.getNextNodeId(), tblRef.getDesc()); break; case ES_EXTERNAL_TABLE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index de007632c9..7f5cbddb87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -38,11 +38,17 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.catalog.external.IcebergExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.external.iceberg.IcebergApiSource; +import org.apache.doris.planner.external.iceberg.IcebergHMSSource; +import org.apache.doris.planner.external.iceberg.IcebergScanProvider; +import org.apache.doris.planner.external.iceberg.IcebergSource; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; @@ -175,6 +181,9 @@ public class ExternalFileScanNode extends ExternalScanNode { } else if (this.desc.getTable() instanceof FunctionGenTable) { FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf()); + } else if (this.desc.getTable() instanceof IcebergExternalTable) { + IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable(); + initIcebergExternalTable(table); } break; case LOAD: @@ -211,6 +220,9 @@ public class ExternalFileScanNode extends ExternalScanNode { } else if (this.desc.getTable() instanceof FunctionGenTable) { FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); initFunctionGenTable(table, (ExternalFileTableValuedFunction) table.getTvf()); + } else if (this.desc.getTable() instanceof IcebergExternalTable) { + IcebergExternalTable table = (IcebergExternalTable) this.desc.getTable(); + initIcebergExternalTable(table); } break; default: @@ -244,7 +256,8 @@ public class ExternalFileScanNode extends ExternalScanNode { scanProvider = new HudiScanProvider(hmsTable, desc, columnNameToRange); break; case ICEBERG: - scanProvider = new IcebergScanProvider(hmsTable, analyzer, desc, columnNameToRange); + IcebergSource hmsSource = new IcebergHMSSource(hmsTable, desc, columnNameToRange); + scanProvider = new IcebergScanProvider(hmsSource, analyzer); break; case HIVE: scanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange); @@ -255,6 +268,29 @@ public class ExternalFileScanNode extends ExternalScanNode { this.scanProviders.add(scanProvider); } + private void initIcebergExternalTable(IcebergExternalTable icebergTable) throws UserException { + Preconditions.checkNotNull(icebergTable); + if (icebergTable.isView()) { + throw new AnalysisException( + String.format("Querying external view '%s.%s' is not supported", icebergTable.getDbName(), + icebergTable.getName())); + } + + FileScanProviderIf scanProvider; + String catalogType = icebergTable.getIcebergCatalogType(); + switch (catalogType) { + case IcebergExternalCatalog.ICEBERG_HMS: + case IcebergExternalCatalog.ICEBERG_REST: + IcebergSource icebergSource = new IcebergApiSource( + icebergTable, desc, columnNameToRange); + scanProvider = new IcebergScanProvider(icebergSource, analyzer); + break; + default: + throw new UserException("Unknown iceberg catalog type: " + catalogType); + } + this.scanProviders.add(scanProvider); + } + private void initFunctionGenTable(FunctionGenTable table, ExternalFileTableValuedFunction tvf) { Preconditions.checkNotNull(table); FileScanProviderIf scanProvider = new TVFScanProvider(table, desc, tvf); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java index b332819700..d894dd1fff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java @@ -25,6 +25,8 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext; +import org.apache.doris.planner.external.iceberg.IcebergScanProvider; +import org.apache.doris.planner.external.iceberg.IcebergSplit; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TExternalScanRange; import org.apache.doris.thrift.TFileAttributes; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java new file mode 100644 index 0000000000..35b45282c5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.iceberg; + +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.IcebergExternalTable; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.planner.ColumnRange; +import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFileScanSlotInfo; + +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Get metadata from iceberg api (all iceberg table like hive, rest, glue...) + */ +public class IcebergApiSource implements IcebergSource { + + private final IcebergExternalTable icebergExtTable; + private final Table originTable; + + private final TupleDescriptor desc; + + public IcebergApiSource(IcebergExternalTable table, TupleDescriptor desc, + Map<String, ColumnRange> columnNameToRange) { + this.icebergExtTable = table; + this.originTable = ((IcebergExternalCatalog) icebergExtTable.getCatalog()) + .getIcebergTable(icebergExtTable.getDbName(), icebergExtTable.getName()); + this.desc = desc; + } + + @Override + public TupleDescriptor getDesc() { + return desc; + } + + @Override + public String getFileFormat() { + return originTable.properties() + .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + } + + @Override + public Table getIcebergTable() throws MetaNotFoundException { + return originTable; + } + + @Override + public TableIf getTargetTable() { + return icebergExtTable; + } + + @Override + public ExternalFileScanNode.ParamCreateContext createContext() throws UserException { + ExternalFileScanNode.ParamCreateContext context = new ExternalFileScanNode.ParamCreateContext(); + context.params = new TFileScanRangeParams(); + context.destTupleDescriptor = desc; + context.params.setDestTupleId(desc.getId().asInt()); + context.fileGroup = new BrokerFileGroup(icebergExtTable.getId(), originTable.location(), getFileFormat()); + + // Hive table must extract partition value from path and hudi/iceberg table keep + // partition field in file. + List<String> partitionKeys = originTable.spec().fields().stream() + .map(PartitionField::name).collect(Collectors.toList()); + List<Column> columns = icebergExtTable.getBaseSchema(false); + context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); + for (SlotDescriptor slot : desc.getSlots()) { + if (!slot.isMaterialized()) { + continue; + } + TFileScanSlotInfo slotInfo = new TFileScanSlotInfo(); + slotInfo.setSlotId(slot.getId().asInt()); + slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName())); + context.params.addToRequiredSlots(slotInfo); + } + return context; + } + + @Override + public TFileAttributes getFileAttributes() throws UserException { + return new TFileAttributes(); + } + + @Override + public ExternalCatalog getCatalog() { + return icebergExtTable.getCatalog(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergDeleteFileFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergDeleteFileFilter.java similarity index 92% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergDeleteFileFilter.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergDeleteFileFilter.java index 9add306c46..e1e94a8329 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergDeleteFileFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergDeleteFileFilter.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.planner.external; +package org.apache.doris.planner.external.iceberg; import lombok.Data; @@ -64,7 +64,6 @@ public class IcebergDeleteFileFilter { } } - @Data static class EqualityDelete extends IcebergDeleteFileFilter { private List<Integer> fieldIds; @@ -72,5 +71,13 @@ public class IcebergDeleteFileFilter { super(deleteFilePath); this.fieldIds = fieldIds; } + + public List<Integer> getFieldIds() { + return fieldIds; + } + + public void setFieldIds(List<Integer> fieldIds) { + this.fieldIds = fieldIds; + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java new file mode 100644 index 0000000000..747d7fd6f6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.iceberg; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.planner.ColumnRange; +import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.planner.external.HiveScanProvider; +import org.apache.doris.thrift.TFileAttributes; + +import org.apache.iceberg.TableProperties; + +import java.util.Map; + +public class IcebergHMSSource implements IcebergSource { + + private final HMSExternalTable hmsTable; + private final HiveScanProvider hiveScanProvider; + + private final TupleDescriptor desc; + + public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc, + Map<String, ColumnRange> columnNameToRange) { + this.hiveScanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange); + this.hmsTable = hmsTable; + this.desc = desc; + } + + @Override + public TupleDescriptor getDesc() { + return desc; + } + + @Override + public String getFileFormat() throws DdlException, MetaNotFoundException { + return hiveScanProvider.getRemoteHiveTable().getParameters() + .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + } + + public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException { + return HiveMetaStoreClientHelper.getIcebergTable(hmsTable); + } + + @Override + public ExternalFileScanNode.ParamCreateContext createContext() throws UserException { + return hiveScanProvider.createContext(null); + } + + @Override + public TableIf getTargetTable() { + return hiveScanProvider.getTargetTable(); + } + + @Override + public TFileAttributes getFileAttributes() throws UserException { + return hiveScanProvider.getFileAttributes(); + } + + @Override + public ExternalCatalog getCatalog() { + return hmsTable.getCatalog(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java similarity index 75% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java index 24dc94fc0e..94e744725a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java @@ -15,22 +15,25 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.planner.external; +package org.apache.doris.planner.external.iceberg; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.TableSnapshot; -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.HiveMetaStoreClientHelper; -import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.external.iceberg.util.IcebergUtils; -import org.apache.doris.planner.ColumnRange; +import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.planner.external.QueryScanProvider; +import org.apache.doris.planner.external.TableFormatType; +import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TIcebergDeleteFileDesc; import org.apache.doris.thrift.TIcebergFileDesc; import org.apache.doris.thrift.TTableFormatFileDesc; @@ -43,7 +46,7 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.MetadataColumns; -import org.apache.iceberg.TableProperties; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.TableScan; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.expressions.Expression; @@ -52,23 +55,23 @@ import org.apache.iceberg.types.Conversions; import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.stream.Collectors; /** * A file scan provider for iceberg. */ -public class IcebergScanProvider extends HiveScanProvider { +public class IcebergScanProvider extends QueryScanProvider { private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; private final Analyzer analyzer; + private final IcebergSource icebergSource; - public IcebergScanProvider(HMSExternalTable hmsTable, Analyzer analyzer, TupleDescriptor desc, - Map<String, ColumnRange> columnNameToRange) { - super(hmsTable, desc, columnNameToRange); + public IcebergScanProvider(IcebergSource icebergSource, Analyzer analyzer) { + this.icebergSource = icebergSource; this.analyzer = analyzer; } @@ -111,34 +114,43 @@ public class IcebergScanProvider extends HiveScanProvider { } @Override - public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { - TFileFormatType type; - - String icebergFormat = getRemoteHiveTable().getParameters() - .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); - if (icebergFormat.equalsIgnoreCase("parquet")) { - type = TFileFormatType.FORMAT_PARQUET; - } else if (icebergFormat.equalsIgnoreCase("orc")) { - type = TFileFormatType.FORMAT_ORC; - } else { - throw new DdlException(String.format("Unsupported format name: %s for iceberg table.", icebergFormat)); + public TFileType getLocationType() throws DdlException, MetaNotFoundException { + String location = icebergSource.getIcebergTable().location(); + if (location != null && !location.isEmpty()) { + if (location.startsWith(FeConstants.FS_PREFIX_S3) + || location.startsWith(FeConstants.FS_PREFIX_S3A) + || location.startsWith(FeConstants.FS_PREFIX_S3N) + || location.startsWith(FeConstants.FS_PREFIX_BOS) + || location.startsWith(FeConstants.FS_PREFIX_COS) + || location.startsWith(FeConstants.FS_PREFIX_OSS) + || location.startsWith(FeConstants.FS_PREFIX_OBS)) { + return TFileType.FILE_S3; + } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) { + return TFileType.FILE_HDFS; + } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) { + return TFileType.FILE_LOCAL; + } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) { + return TFileType.FILE_BROKER; + } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) { + return TFileType.FILE_BROKER; + } } - return type; + throw new DdlException("Unknown file location " + location + + " for hms table " + icebergSource.getIcebergTable().name()); } @Override public List<InputSplit> getSplits(List<Expr> exprs) throws UserException { List<Expression> expressions = new ArrayList<>(); - org.apache.iceberg.Table table = HiveMetaStoreClientHelper.getIcebergTable(hmsTable); + org.apache.iceberg.Table table = icebergSource.getIcebergTable(); for (Expr conjunct : exprs) { Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, table.schema()); if (expression != null) { expressions.add(expression); } } - TableScan scan = table.newScan(); - TableSnapshot tableSnapshot = desc.getRef().getTableSnapshot(); + TableSnapshot tableSnapshot = icebergSource.getDesc().getRef().getTableSnapshot(); if (tableSnapshot != null) { TableSnapshot.VersionType type = tableSnapshot.getType(); try { @@ -220,6 +232,41 @@ public class IcebergScanProvider extends HiveScanProvider { @Override public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException { - return Collections.emptyList(); + return icebergSource.getIcebergTable().spec().fields().stream().map(PartitionField::name) + .collect(Collectors.toList()); + } + + @Override + public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { + TFileFormatType type; + String icebergFormat = icebergSource.getFileFormat(); + if (icebergFormat.equalsIgnoreCase("parquet")) { + type = TFileFormatType.FORMAT_PARQUET; + } else if (icebergFormat.equalsIgnoreCase("orc")) { + type = TFileFormatType.FORMAT_ORC; + } else { + throw new DdlException(String.format("Unsupported format name: %s for iceberg table.", icebergFormat)); + } + return type; + } + + @Override + public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException { + return icebergSource.getCatalog().getProperties(); + } + + @Override + public ExternalFileScanNode.ParamCreateContext createContext(Analyzer analyzer) throws UserException { + return icebergSource.createContext(); + } + + @Override + public TableIf getTargetTable() { + return icebergSource.getTargetTable(); + } + + @Override + public TFileAttributes getFileAttributes() throws UserException { + return icebergSource.getFileAttributes(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java new file mode 100644 index 0000000000..ab17c6a448 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.iceberg; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.thrift.TFileAttributes; + +public interface IcebergSource { + + TupleDescriptor getDesc(); + + org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException; + + ExternalFileScanNode.ParamCreateContext createContext() throws UserException; + + TableIf getTargetTable(); + + TFileAttributes getFileAttributes() throws UserException; + + ExternalCatalog getCatalog(); + + String getFileFormat() throws DdlException, MetaNotFoundException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java similarity index 92% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java index b9607a7f00..a82c99b04a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.planner.external; +package org.apache.doris.planner.external.iceberg; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.planner.external.HiveSplit; import lombok.Data; import org.apache.hadoop.fs.Path; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org