This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d5a61115ad02ef88aea2eec62d630363326327ec Author: Mingyu Chen (Rayner) <[email protected]> AuthorDate: Sun Mar 15 20:39:44 2026 -0700 branch-4.1: [feature](paimon) implement create/drop db, create/drop table for paimon (#58894) (#61338) Cherry-pick from #58894 --------- Co-authored-by: yaoxiao <[email protected]> Co-authored-by: yaoxiao <[email protected]> --- .../java/org/apache/doris/catalog/TableIf.java | 2 + .../operations/ExternalMetadataOperations.java | 6 + .../paimon/DorisToPaimonTypeVisitor.java | 109 ++++++ .../datasource/paimon/PaimonExternalCatalog.java | 40 +- .../doris/datasource/paimon/PaimonMetadataOps.java | 405 +++++++++++++++++++++ .../plans/commands/info/ColumnDefinition.java | 12 + .../trees/plans/commands/info/CreateTableInfo.java | 21 +- .../datasource/paimon/PaimonMetadataOpsTest.java | 256 +++++++++++++ .../paimon/test_paimon_table.groovy | 122 +++++++ 9 files changed, 935 insertions(+), 38 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 2a17198a288..8b6f62ae76a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -485,6 +485,8 @@ public interface TableIf { case ICEBERG: case ICEBERG_EXTERNAL_TABLE: return "iceberg"; + case PAIMON_EXTERNAL_TABLE: + return "paimon"; case DICTIONARY: return "dictionary"; case DORIS_EXTERNAL_TABLE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java index 7d63b18cd13..513b3379177 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java @@ -21,6 +21,7 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetadataOps; import org.apache.doris.datasource.iceberg.IcebergMetadataOps; +import org.apache.doris.datasource.paimon.PaimonMetadataOps; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.catalog.Catalog; @@ -35,4 +36,9 @@ public class ExternalMetadataOperations { public static IcebergMetadataOps newIcebergMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) { return new IcebergMetadataOps(dorisCatalog, catalog); } + + public static PaimonMetadataOps newPaimonMetaOps(ExternalCatalog dorisCatalog, + org.apache.paimon.catalog.Catalog catalog) { + return new PaimonMetadataOps(dorisCatalog, catalog); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java new file mode 100644 index 00000000000..aad8106563b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.DorisTypeVisitor; + +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class DorisToPaimonTypeVisitor extends DorisTypeVisitor<DataType> { + + @Override + public DataType struct(StructType struct, List<DataType> fieldResults) { + List<StructField> fields = struct.getFields(); + List<DataField> newFields = new ArrayList<>(fields.size()); + AtomicInteger atomicInteger = new AtomicInteger(-1); + for (int i = 0; i < fields.size(); i++) { + StructField field = fields.get(i); + DataType fieldType = fieldResults.get(i).copy(field.getContainsNull()); + String comment = field.getComment(); + DataField dataField = new DataField(atomicInteger.incrementAndGet(), field.getName(), fieldType, comment); + newFields.add(dataField); + } + return new RowType(newFields); + } + + @Override + public DataType field(StructField field, DataType typeResult) { + return typeResult; + } + + @Override + public DataType array(ArrayType array, DataType elementResult) { + return new org.apache.paimon.types.ArrayType(elementResult.copy(array.getContainsNull())); + } + + @Override + public DataType map(MapType map, DataType keyResult, DataType valueResult) { + return new org.apache.paimon.types.MapType(keyResult.copy(false), + valueResult.copy(map.getIsValueContainsNull())); + } + + @Override + public DataType atomic(Type atomic) { + PrimitiveType primitiveType = atomic.getPrimitiveType(); + if (primitiveType.equals(PrimitiveType.BOOLEAN)) { + return new BooleanType(); + } else if (primitiveType.equals(PrimitiveType.INT)) { + return new IntType(); + } else if (primitiveType.equals(PrimitiveType.BIGINT)) { + return new BigIntType(); + } else if (primitiveType.equals(PrimitiveType.FLOAT)) { + return new FloatType(); + } else if (primitiveType.equals(PrimitiveType.DOUBLE)) { + return new DoubleType(); + } else if (primitiveType.isCharFamily()) { + return new VarCharType(VarCharType.MAX_LENGTH); + } else if (primitiveType.equals(PrimitiveType.DATE) || primitiveType.equals(PrimitiveType.DATEV2)) { + return new DateType(); + } else if (primitiveType.equals(PrimitiveType.DECIMALV2) || primitiveType.isDecimalV3Type()) { + return new DecimalType(((ScalarType) atomic).getScalarPrecision(), ((ScalarType) atomic).getScalarScale()); + } else if (primitiveType.equals(PrimitiveType.DATETIME) || primitiveType.equals(PrimitiveType.DATETIMEV2)) { + return new TimestampType(); + } else if (primitiveType.isVarbinaryType()) { + return new VarBinaryType(VarBinaryType.MAX_LENGTH); + } else if (primitiveType.isVariantType()) { + return new VariantType(); + } + throw new UnsupportedOperationException("Not a supported type: " + primitiveType); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java index b6a06fd4670..f15309ea0e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java @@ -24,13 +24,13 @@ import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.NameMapping; import org.apache.doris.datasource.SessionContext; import org.apache.doris.datasource.metacache.CacheSpec; +import org.apache.doris.datasource.operations.ExternalMetadataOperations; import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Catalog.TableNotExistException; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.partition.Partition; @@ -67,6 +67,7 @@ public class PaimonExternalCatalog extends ExternalCatalog { catalogType = paimonProperties.getPaimonCatalogType(); catalog = createCatalog(); initPreExecutionAuthenticator(); + metadataOps = ExternalMetadataOperations.newPaimonMetaOps(this, catalog); } @Override @@ -81,49 +82,16 @@ public class PaimonExternalCatalog extends ExternalCatalog { return catalogType; } - protected List<String> listDatabaseNames() { - try { - return executionAuthenticator.execute(() -> new ArrayList<>(catalog.listDatabases())); - } catch (Exception e) { - throw new RuntimeException("Failed to list databases names, catalog name: " + getName(), e); - } - } - @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { makeSureInitialized(); - try { - return executionAuthenticator.execute(() -> { - try { - catalog.getTable(Identifier.create(dbName, tblName)); - return true; - } catch (TableNotExistException e) { - return false; - } - }); - - } catch (Exception e) { - throw new RuntimeException("Failed to check table existence, catalog name: " + getName() - + "error message is:" + ExceptionUtils.getRootCauseMessage(e), e); - } + return metadataOps.tableExist(dbName, tblName); } @Override public List<String> listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); - try { - return executionAuthenticator.execute(() -> { - List<String> tableNames = null; - try { - tableNames = catalog.listTables(dbName); - } catch (Catalog.DatabaseNotExistException e) { - LOG.warn("DatabaseNotExistException", e); - } - return tableNames; - }); - } catch (Exception e) { - throw new RuntimeException("Failed to list table names, catalog name: " + getName(), e); - } + return metadataOps.listTableNames(dbName); } public List<Partition> getPaimonPartitions(NameMapping nameMapping) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java new file mode 100644 index 00000000000..e6c8177edca --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java @@ -0,0 +1,405 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.ExecutionAuthenticator; +import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.DorisTypeVisitor; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalDatabase; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.operations.ExternalMetadataOps; +import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Catalog.DatabaseNotEmptyException; +import org.apache.paimon.catalog.Catalog.DatabaseNotExistException; +import org.apache.paimon.catalog.Catalog.TableAlreadyExistException; +import org.apache.paimon.catalog.Catalog.TableNotExistException; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public class PaimonMetadataOps implements ExternalMetadataOps { + + private static final Logger LOG = LogManager.getLogger(PaimonMetadataOps.class); + protected Catalog catalog; + protected ExternalCatalog dorisCatalog; + private ExecutionAuthenticator executionAuthenticator; + private static final String PRIMARY_KEY_IDENTIFIER = "primary-key"; + private static final String PROP_COMMENT = "comment"; + private static final String PROP_LOCATION = "location"; + + public PaimonMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) { + this.dorisCatalog = dorisCatalog; + this.catalog = catalog; + this.executionAuthenticator = dorisCatalog.getExecutionAuthenticator(); + } + + + @Override + public boolean createDbImpl(String dbName, boolean ifNotExists, Map<String, String> properties) + throws DdlException { + try { + return executionAuthenticator.execute(() -> performCreateDb(dbName, ifNotExists, properties)); + } catch (Exception e) { + throw new DdlException("Failed to create database: " + + dbName + ": " + Util.getRootCauseMessage(e), e); + } + } + + private boolean performCreateDb(String dbName, boolean ifNotExists, Map<String, String> properties) + throws DdlException, Catalog.DatabaseAlreadyExistException { + if (databaseExist(dbName)) { + if (ifNotExists) { + LOG.info("create database[{}] which already exists", dbName); + return true; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, dbName); + } + } + + if (!properties.isEmpty() && dorisCatalog instanceof PaimonExternalCatalog) { + String catalogType = ((PaimonExternalCatalog) dorisCatalog).getCatalogType(); + if (!PaimonExternalCatalog.PAIMON_HMS.equals(catalogType)) { + throw new DdlException( + "Not supported: create database with properties for paimon catalog type: " + catalogType); + } + } + + catalog.createDatabase(dbName, ifNotExists, properties); + return false; + } + + @Override + public void afterCreateDb() { + dorisCatalog.resetMetaCacheNames(); + } + + @Override + public void dropDbImpl(String dbName, boolean ifExists, boolean force) throws DdlException { + try { + executionAuthenticator.execute(() -> { + performDropDb(dbName, ifExists, force); + return null; + }); + } catch (Exception e) { + throw new DdlException( + "Failed to drop database: " + dbName + ", error message is:" + e.getMessage(), e); + } + } + + private void performDropDb(String dbName, boolean ifExists, boolean force) throws DdlException { + ExternalDatabase dorisDb = dorisCatalog.getDbNullable(dbName); + if (dorisDb == null) { + if (ifExists) { + LOG.info("drop database[{}] which does not exist", dbName); + // Database does not exist and IF EXISTS is specified; treat as no-op. + return; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName); + // ErrorReport.reportDdlException is expected to throw DdlException. + return; + } + } + + if (force) { + List<String> tableNames = listTableNames(dbName); + if (!tableNames.isEmpty()) { + LOG.info("drop database[{}] with force, drop all tables, num: {}", dbName, tableNames.size()); + } + for (String tableName : tableNames) { + performDropTable(dbName, tableName, true); + } + } + + try { + catalog.dropDatabase(dbName, ifExists, force); + } catch (DatabaseNotExistException e) { + throw new RuntimeException("database " + dbName + " does not exist!"); + } catch (DatabaseNotEmptyException e) { + throw new RuntimeException("database " + dbName + " is not empty! please check!"); + } + } + + @Override + public void afterDropDb(String dbName) { + dorisCatalog.unregisterDatabase(dbName); + } + + @Override + public boolean createTableImpl(CreateTableInfo createTableInfo) throws UserException { + try { + return executionAuthenticator.execute(() -> performCreateTable(createTableInfo)); + } catch (Exception e) { + throw new DdlException( + "Failed to create table: " + createTableInfo.getTableName() + ", error message is:" + e.getMessage(), + e); + } + } + + public boolean performCreateTable(CreateTableInfo createTableInfo) throws UserException { + String dbName = createTableInfo.getDbName(); + ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName); + if (db == null) { + throw new UserException("Failed to get database: '" + dbName + "' in catalog: " + dorisCatalog.getName()); + } + String tableName = createTableInfo.getTableName(); + // 1. first, check if table exist in remote + if (tableExist(db.getRemoteName(), tableName)) { + if (createTableInfo.isIfNotExists()) { + LOG.info("create table[{}] which already exists", tableName); + return true; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); + } + } + + // 2. second, check if table exist in local. + // This is because case sensibility issue, eg: + // 1. lower_case_table_name = 1 + // 2. create table tbl1; + // 3. create table TBL1; TBL1 does not exist in remote because the remote system is case-sensitive. + // but because lower_case_table_name = 1, the table can not be created in Doris because it is conflict with + // tbl1 + ExternalTable dorisTable = db.getTableNullable(tableName); + if (dorisTable != null) { + if (createTableInfo.isIfNotExists()) { + LOG.info("create table[{}] which already exists", tableName); + return true; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); + } + } + List<ColumnDefinition> columns = createTableInfo.getColumnDefinitions(); + List<StructField> collect = columns.stream() + .map(col -> new StructField(col.getName(), col.getType().toCatalogDataType(), + col.getComment(), col.isNullable())) + .collect(Collectors.toList()); + StructType structType = new StructType(new ArrayList<>(collect)); + Schema schema = toPaimonSchema(structType, createTableInfo.getPartitionDesc(), createTableInfo.getProperties()); + try { + catalog.createTable(new Identifier(createTableInfo.getDbName(), createTableInfo.getTableName()), + schema, createTableInfo.isIfNotExists()); + } catch (TableAlreadyExistException | DatabaseNotExistException e) { + throw new RuntimeException(e); + } + return false; + } + + private Schema toPaimonSchema(StructType structType, PartitionDesc partitionDesc, Map<String, String> properties) { + Map<String, String> normalizedProperties = new HashMap<>(properties); + normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER); + normalizedProperties.remove(PROP_COMMENT); + if (normalizedProperties.containsKey(PROP_LOCATION)) { + String path = normalizedProperties.remove(PROP_LOCATION); + normalizedProperties.put(CoreOptions.PATH.key(), path); + } + + String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER); + List<String> primaryKeys = pkAsString == null ? Collections.emptyList() : Arrays.stream(pkAsString.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + List<String> partitionKeys = partitionDesc == null ? new ArrayList<>() : partitionDesc.getPartitionColNames(); + Schema.Builder schemaBuilder = Schema.newBuilder() + .options(normalizedProperties) + .primaryKey(primaryKeys) + .partitionKeys(partitionKeys) + .comment(properties.getOrDefault(PROP_COMMENT, null)); + for (StructField field : structType.getFields()) { + schemaBuilder.column(field.getName(), + toPaimontype(field.getType()).copy(field.getContainsNull()), + field.getComment()); + } + return schemaBuilder.build(); + } + + private DataType toPaimontype(Type type) { + return DorisTypeVisitor.visit(type, new DorisToPaimonTypeVisitor()); + } + + @Override + public void afterCreateTable(String dbName, String tblName) { + Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName); + if (db.isPresent()) { + db.get().resetMetaCacheNames(); + } + LOG.info("after create table {}.{}.{}, is db exists: {}", + dorisCatalog.getName(), dbName, tblName, db.isPresent()); + } + + @Override + public void dropTableImpl(ExternalTable dorisTable, boolean ifExists) throws DdlException { + try { + executionAuthenticator.execute(() -> { + performDropTable(dorisTable.getRemoteDbName(), dorisTable.getRemoteName(), ifExists); + return null; + }); + } catch (Exception e) { + throw new DdlException( + "Failed to drop table: " + dorisTable.getName() + ", error message is:" + e.getMessage(), e); + } + } + + private void performDropTable(String dBName, String tableName, boolean ifExists) throws DdlException { + if (!tableExist(dBName, tableName)) { + if (ifExists) { + LOG.info("drop table[{}] which does not exist", tableName); + return; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dBName); + } + } + try { + catalog.dropTable(Identifier.create(dBName, tableName), ifExists); + } catch (TableNotExistException e) { + throw new RuntimeException("table " + tableName + " does not exist"); + } + } + + @Override + public void afterDropTable(String dbName, String tblName) { + Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName); + db.ifPresent(externalDatabase -> externalDatabase.unregisterTable(tblName)); + LOG.info("after drop table {}.{}.{}. is db exists: {}", + dorisCatalog.getName(), dbName, tblName, db.isPresent()); + } + + @Override + public void truncateTableImpl(ExternalTable dorisTable, List<String> partitions) throws DdlException { + throw new UnsupportedOperationException("truncate table is not a supported operation!"); + } + + @Override + public void createOrReplaceBranchImpl(ExternalTable dorisTable, CreateOrReplaceBranchInfo branchInfo) + throws UserException { + throw new UnsupportedOperationException("create or replace branch is not a supported operation!"); + } + + @Override + public void createOrReplaceTagImpl(ExternalTable dorisTable, CreateOrReplaceTagInfo tagInfo) throws UserException { + throw new UnsupportedOperationException("create or replace tag is not a supported operation!"); + } + + @Override + public void dropTagImpl(ExternalTable dorisTable, DropTagInfo tagInfo) throws UserException { + throw new UnsupportedOperationException("drop tag is not a supported operation!"); + } + + @Override + public void dropBranchImpl(ExternalTable dorisTable, DropBranchInfo branchInfo) throws UserException { + throw new UnsupportedOperationException("drop branch is not a supported operation!"); + } + + @Override + public List<String> listDatabaseNames() { + try { + return executionAuthenticator.execute(() -> new ArrayList<>(catalog.listDatabases())); + } catch (Exception e) { + throw new RuntimeException("Failed to list databases names, catalog name: " + dorisCatalog.getName(), e); + } + } + + @Override + public List<String> listTableNames(String db) { + try { + return executionAuthenticator.execute(() -> { + List<String> tableNames = new ArrayList<>(); + try { + tableNames.addAll(catalog.listTables(db)); + } catch (DatabaseNotExistException e) { + LOG.warn("DatabaseNotExistException", e); + } + return tableNames; + }); + } catch (Exception e) { + throw new RuntimeException("Failed to list table names, catalog name: " + dorisCatalog.getName(), e); + } + } + + @Override + public boolean tableExist(String dbName, String tblName) { + try { + return executionAuthenticator.execute(() -> { + try { + catalog.getTable(Identifier.create(dbName, tblName)); + return true; + } catch (TableNotExistException e) { + return false; + } + }); + + } catch (Exception e) { + throw new RuntimeException("Failed to check table existence, catalog name: " + dorisCatalog.getName() + + "error message is:" + ExceptionUtils.getRootCauseMessage(e), e); + } + } + + @Override + public boolean databaseExist(String dbName) { + try { + return executionAuthenticator.execute(() -> { + try { + catalog.getDatabase(dbName); + return true; + } catch (DatabaseNotExistException e) { + return false; + } + }); + } catch (Exception e) { + throw new RuntimeException("Failed to check database exist, error message is:" + e.getMessage(), e); + } + } + + public Catalog getCatalog() { + return catalog; + } + + @Override + public void close() { + if (catalog != null) { + catalog = null; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java index a4fdd7329e2..ee2976f29e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java @@ -190,6 +190,18 @@ public class ColumnDefinition { this.generatedColumnsThatReferToThis = generatedColumnsThatReferToThis; } + public String getComment() { + return getComment(false); + } + + public String getComment(boolean escapeQuota) { + String comment = this.comment == null ? "" : this.comment; + if (!escapeQuota) { + return comment; + } + return SqlUtils.escapeQuota(comment); + } + /** * toSql */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java index c897a216268..7216262abcb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java @@ -49,6 +49,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.es.EsUtil; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.analyzer.Scope; @@ -116,6 +117,7 @@ public class CreateTableInfo { public static final String ENGINE_BROKER = "broker"; public static final String ENGINE_HIVE = "hive"; public static final String ENGINE_ICEBERG = "iceberg"; + public static final String ENGINE_PAIMON = "paimon"; private static final ImmutableSet<AggregateType> GENERATED_COLUMN_ALLOW_AGG_TYPE = ImmutableSet.of(AggregateType.REPLACE, AggregateType.REPLACE_IF_NOT_NULL); @@ -375,6 +377,8 @@ public class CreateTableInfo { throw new AnalysisException("Hms type catalog can only use `hive` engine."); } else if (catalog instanceof IcebergExternalCatalog && !engineName.equals(ENGINE_ICEBERG)) { throw new AnalysisException("Iceberg type catalog can only use `iceberg` engine."); + } else if (catalog instanceof PaimonExternalCatalog && !engineName.equals(ENGINE_PAIMON)) { + throw new AnalysisException("Paimon type catalog can only use `paimon` engine."); } } @@ -769,7 +773,12 @@ public class CreateTableInfo { throw new AnalysisException( "Iceberg doesn't support 'DISTRIBUTE BY', " + "and you can use 'bucket(num, column)' in 'PARTITIONED BY'."); + } else if (engineName.equalsIgnoreCase(ENGINE_PAIMON) && distribution != null) { + throw new AnalysisException( + "Paimon doesn't support 'DISTRIBUTE BY', " + + "and you can use 'bucket(num, column)' in 'PARTITIONED BY'."); } + for (ColumnDefinition columnDef : columns) { if (!columnDef.isNullable() && engineName.equalsIgnoreCase(ENGINE_HIVE)) { @@ -865,6 +874,8 @@ public class CreateTableInfo { engineName = ENGINE_HIVE; } else if (catalog instanceof IcebergExternalCatalog) { engineName = ENGINE_ICEBERG; + } else if (catalog instanceof PaimonExternalCatalog) { + engineName = ENGINE_PAIMON; } else { throw new AnalysisException("Current catalog does not support create table: " + ctlName); } @@ -894,7 +905,8 @@ public class CreateTableInfo { private void checkEngineName() { if (engineName.equals(ENGINE_MYSQL) || engineName.equals(ENGINE_ODBC) || engineName.equals(ENGINE_BROKER) || engineName.equals(ENGINE_ELASTICSEARCH) || engineName.equals(ENGINE_HIVE) - || engineName.equals(ENGINE_ICEBERG) || engineName.equals(ENGINE_JDBC)) { + || engineName.equals(ENGINE_ICEBERG) || engineName.equals(ENGINE_JDBC) + || engineName.equals(ENGINE_PAIMON)) { if (!isExternal) { // this is for compatibility isExternal = true; @@ -1071,7 +1083,8 @@ public class CreateTableInfo { throw new AnalysisException("Create " + engineName + " table should not contain distribution desc"); } - if (!engineName.equals(ENGINE_HIVE) && !engineName.equals(ENGINE_ICEBERG) && partitionDesc != null) { + if (!engineName.equals(ENGINE_HIVE) && !engineName.equals(ENGINE_ICEBERG) + && !engineName.equals(ENGINE_PAIMON) && partitionDesc != null) { throw new AnalysisException("Create " + engineName + " table should not contain partition desc"); } @@ -1301,6 +1314,10 @@ public class CreateTableInfo { return partitionDesc; } + public List<ColumnDefinition> getColumnDefinitions() { + return columns; + } + public List<Column> getColumns() { return columns.stream() .map(ColumnDefinition::translateToCatalogStyle).collect(Collectors.toList()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java new file mode 100644 index 00000000000..e4146faa690 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogFactory; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand; +import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; +import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Maps; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.hive.HiveCatalog; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.VarCharType; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +public class PaimonMetadataOpsTest { + public static String warehouse; + public static PaimonExternalCatalog paimonCatalog; + public static PaimonMetadataOps ops; + public static String dbName = "test_db"; + public static ConnectContext connectContext; + + @BeforeClass + public static void beforeClass() throws Throwable { + Path warehousePath = Files.createTempDirectory("test_warehouse_"); + warehouse = "file://" + warehousePath.toAbsolutePath() + "/"; + HashMap<String, String> param = new HashMap<>(); + param.put("type", "paimon"); + param.put("paimon.catalog.type", "filesystem"); + param.put("warehouse", warehouse); + // create catalog + CreateCatalogCommand createCatalogCommand = new CreateCatalogCommand("paimon", true, "", "comment", param); + paimonCatalog = (PaimonExternalCatalog) CatalogFactory.createFromCommand(1, createCatalogCommand); + paimonCatalog.makeSureInitialized(); + // create db + ops = new PaimonMetadataOps(paimonCatalog, paimonCatalog.catalog); + ops.createDb(dbName, true, Maps.newHashMap()); + paimonCatalog.makeSureInitialized(); + + // context + connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); + } + + @Test + public void testSimpleTable() throws Exception { + String tableName = getTableName(); + Identifier identifier = new Identifier(dbName, tableName); + String sql = "create table " + dbName + "." + tableName + " (id int) engine = paimon"; + createTable(sql); + Catalog catalog = ops.getCatalog(); + Table table = catalog.getTable(identifier); + List<String> columnNames = new ArrayList<>(); + if (catalog instanceof HiveCatalog) { + columnNames.addAll(((HiveCatalog) catalog).loadTableSchema(identifier).fieldNames()); + } else if (catalog instanceof FileSystemCatalog) { + columnNames.addAll(((FileSystemCatalog) catalog).loadTableSchema(identifier).fieldNames()); + } + + if (!columnNames.isEmpty()) { + Assert.assertEquals(1, columnNames.size()); + } + Assert.assertEquals(0, table.partitionKeys().size()); + } + + @Test + public void testProperties() throws Exception { + String tableName = getTableName(); + Identifier identifier = new Identifier(dbName, tableName); + String sql = "create table " + dbName + "." + tableName + " (id int) engine = paimon properties(\"primary-key\"=id)"; + createTable(sql); + Catalog catalog = ops.getCatalog(); + Table table = catalog.getTable(identifier); + + List<String> columnNames = new ArrayList<>(); + if (catalog instanceof HiveCatalog) { + columnNames.addAll(((HiveCatalog) catalog).loadTableSchema(identifier).fieldNames()); + } else if (catalog instanceof FileSystemCatalog) { + columnNames.addAll(((FileSystemCatalog) catalog).loadTableSchema(identifier).fieldNames()); + } + + if (!columnNames.isEmpty()) { + Assert.assertEquals(1, columnNames.size()); + } + Assert.assertEquals(0, table.partitionKeys().size()); + Assert.assertTrue(table.primaryKeys().contains("id")); + Assert.assertEquals(1, table.primaryKeys().size()); + } + + @Test + public void testType() throws Exception { + String tableName = getTableName(); + Identifier identifier = new Identifier(dbName, tableName); + String sql = "create table " + dbName + "." + tableName + " (" + + "c0 int, " + + "c1 bigint, " + + "c2 float, " + + "c3 double, " + + "c4 string, " + + "c5 date, " + + "c6 decimal(20, 10), " + + "c7 datetime" + + ") engine = paimon " + + "properties(\"primary-key\"=c0)"; + createTable(sql); + Catalog catalog = ops.getCatalog(); + Table table = catalog.getTable(identifier); + + List<DataField> columns = new ArrayList<>(); + if (catalog instanceof HiveCatalog) { + columns.addAll(((HiveCatalog) catalog).loadTableSchema(identifier).fields()); + } else if (catalog instanceof FileSystemCatalog) { + columns.addAll(((FileSystemCatalog) catalog).loadTableSchema(identifier).fields()); + } + + if (!columns.isEmpty()) { + Assert.assertEquals(8, columns.size()); + Assert.assertEquals(new IntType().asSQLString(), columns.get(0).type().toString()); + Assert.assertEquals(new BigIntType().asSQLString(), columns.get(1).type().toString()); + Assert.assertEquals(new FloatType().asSQLString(), columns.get(2).type().toString()); + Assert.assertEquals(new DoubleType().asSQLString(), columns.get(3).type().toString()); + Assert.assertEquals(new VarCharType(VarCharType.MAX_LENGTH).asSQLString(), columns.get(4).type().toString()); + Assert.assertEquals(new DateType().asSQLString(), columns.get(5).type().toString()); + Assert.assertEquals(new DecimalType(20, 10).asSQLString(), columns.get(6).type().toString()); + Assert.assertEquals(new TimestampType().asSQLString(), columns.get(7).type().toString()); + } + + Assert.assertEquals(0, table.partitionKeys().size()); + Assert.assertTrue(table.primaryKeys().contains("c0")); + Assert.assertEquals(1, table.primaryKeys().size()); + } + + @Test + public void testPartition() throws Exception { + String tableName = "test04"; + Identifier identifier = new Identifier(dbName, tableName); + String sql = "create table " + dbName + "." + tableName + " (" + + "c0 int, " + + "c1 bigint, " + + "c2 float, " + + "c3 double, " + + "c4 string, " + + "c5 date, " + + "c6 decimal(20, 10), " + + "c7 datetime" + + ") engine = paimon " + + "partition by (" + + "c1 ) ()" + + "properties(\"primary-key\"=c0)"; + createTable(sql); + Catalog catalog = ops.getCatalog(); + Table table = catalog.getTable(identifier); + Assert.assertEquals(1, table.partitionKeys().size()); + Assert.assertTrue(table.primaryKeys().contains("c0")); + Assert.assertEquals(1, table.primaryKeys().size()); + } + + @Test + public void testBucket() throws Exception { + String tableName = getTableName(); + Identifier identifier = new Identifier(dbName, tableName); + String sql = "create table " + dbName + "." + tableName + " (" + + "c0 int, " + + "c1 bigint, " + + "c2 float, " + + "c3 double, " + + "c4 string, " + + "c5 date, " + + "c6 decimal(20, 10), " + + "c7 datetime" + + ") engine = paimon " + + "properties(\"primary-key\"=c0," + + "\"bucket\" = 4," + + "\"bucket-key\" = c0)"; + createTable(sql); + Catalog catalog = ops.getCatalog(); + Table table = catalog.getTable(identifier); + Assert.assertEquals("4", table.options().get("bucket")); + Assert.assertEquals("c0", table.options().get("bucket-key")); + } + + public void createTable(String sql) throws UserException { + LogicalPlan plan = new NereidsParser().parseSingle(sql); + Assertions.assertTrue(plan instanceof CreateTableCommand); + CreateTableInfo createTableInfo = ((CreateTableCommand) plan).getCreateTableInfo(); + createTableInfo.setIsExternal(true); + createTableInfo.analyzeEngine(); + ops.createTable(createTableInfo); + } + + public String getTableName() { + String s = "test_tb_" + UUID.randomUUID(); + return s.replaceAll("-", ""); + } + + @Test + public void testDropDB() { + try { + // create db success + ops.createDb("t_paimon", false, Maps.newHashMap()); + // drop db success + ops.dropDb("t_paimon", false, false); + } catch (Throwable t) { + Assert.fail(); + } + + try { + ops.dropDb("t_paimon", false, false); + Assert.fail(); + } catch (Throwable t) { + Assert.assertTrue(t instanceof DdlException); + Assert.assertTrue(t.getMessage().contains("database doesn't exist")); + } + } +} diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy new file mode 100644 index 00000000000..032176bd103 --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_create_paimon_table", "p0,external,doris,external_docker,external_docker_doris") { + String catalog_name = "paimon_hms_catalog_test01" + + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + for (String hivePrefix : ["hive2"]) { + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String hmsPort = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") + String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}" + String warehouse = "${default_fs}/warehouse" + + // 1. test create catalog + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog ${catalog_name} properties ( + 'type'='paimon', + 'paimon.catalog.type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}', + 'warehouse' = '${warehouse}' + ); + """ + + // 2. test create database + sql """switch ${catalog_name}""" + String db_name = "test_db" + sql """create database if not exists ${db_name}""" + + // 3. test create table + sql """use ${db_name}""" + sql """drop table if exists ${db_name}.test01""" + sql """ + CREATE TABLE ${db_name}.test01 ( + id int + ) engine=paimon; + """ + + sql """drop table if exists ${db_name}.test02""" + sql """ + CREATE TABLE ${db_name}.test02 ( + id int + ) engine=paimon + properties("primary-key"=id); + """ + + sql """drop table if exists ${db_name}.test03""" + sql """ + CREATE TABLE ${db_name}.test03 ( + c0 int, + c1 bigint, + c2 float, + c3 double, + c4 string, + c5 date, + c6 decimal(10,5), + c7 datetime + ) engine=paimon + properties("primary-key"=c0); + """ + + sql """drop table if exists ${db_name}.test04""" + sql """ + CREATE TABLE ${db_name}.test04 ( + c0 int, + c1 bigint, + c2 float, + c3 double, + c4 string, + c5 date, + c6 decimal(10,5), + c7 datetime + ) engine=paimon + partition by (c1) () + properties("primary-key"=c0); + """ + + sql """drop table if exists ${db_name}.test05""" + sql """ + CREATE TABLE ${db_name}.test05 ( + c0 int, + c1 bigint, + c2 float, + c3 double, + c4 string, + c5 date, + c6 decimal(10,5), + c7 datetime + ) engine=paimon + properties( + 'primary-key' = 'c0,c1', + 'bucket' = '4', + 'bucket-key' = 'c0,c1'); + """ + + sql """ drop table if exists ${db_name}.test01""" + sql """ drop table if exists ${db_name}.test02""" + sql """ drop table if exists ${db_name}.test03""" + sql """ drop table if exists ${db_name}.test04""" + sql """ drop table if exists ${db_name}.test05""" + sql """ drop database if exists ${db_name}""" + sql """DROP CATALOG IF EXISTS ${catalog_name}""" + } + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
