This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 94d103b1b52b00a88247e67d0a1efdaa79f29bf5 Author: Mingyu Chen (Rayner) <[email protected]> AuthorDate: Sun Mar 15 20:47:37 2026 -0700 branch-4.1: [feat](maxcompute) support create/drop table operations (#61339) Cherry-pick of #60702 to branch-4.1. Support DDL operations (CREATE TABLE and DROP TABLE) for MaxCompute external catalog in Doris. --------- Co-authored-by: Claude Haiku 4.5 <[email protected]> --- .../maxcompute/MaxComputeExternalCatalog.java | 8 + .../maxcompute/MaxComputeMetadataOps.java | 522 +++++++++++++++++++++ .../datasource/maxcompute/McStructureHelper.java | 35 ++ .../operations/ExternalMetadataOperations.java | 9 + .../trees/plans/commands/info/CreateTableInfo.java | 14 +- .../maxcompute/test_max_compute_create_table.out | 34 ++ .../test_max_compute_create_table.groovy | 426 +++++++++++++++++ 7 files changed, 1045 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java index edf339e6702..ecdb66b1c1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java @@ -25,6 +25,7 @@ import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; +import org.apache.doris.datasource.operations.ExternalMetadataOperations; import com.aliyun.odps.Odps; import com.aliyun.odps.Partition; @@ -223,6 +224,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { boolean enableNamespaceSchema = Boolean.parseBoolean( props.getOrDefault(MCProperties.ENABLE_NAMESPACE_SCHEMA, MCProperties.DEFAULT_ENABLE_NAMESPACE_SCHEMA)); mcStructureHelper = McStructureHelper.getHelper(enableNamespaceSchema, defaultProject); + + metadataOps = ExternalMetadataOperations.newMaxComputeMetadataOps(this, odps); } public Odps getClient() { @@ -230,6 +233,11 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { return odps; } + public McStructureHelper getMcStructureHelper() { + makeSureInitialized(); + return mcStructureHelper; + } + protected List<String> listDatabaseNames() { makeSureInitialized(); return mcStructureHelper.listDatabaseNames(getClient(), getDefaultProject()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataOps.java new file mode 100644 index 00000000000..f7ad191444b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeMetadataOps.java @@ -0,0 +1,522 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.maxcompute; + +import org.apache.doris.analysis.DistributionDesc; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.HashDistributionDesc; +import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalDatabase; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.operations.ExternalMetadataOps; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo; + +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.TableSchema; +import com.aliyun.odps.Tables; +import com.aliyun.odps.type.TypeInfo; +import com.aliyun.odps.type.TypeInfoFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * MaxCompute metadata operations for DDL support (CREATE TABLE, etc.) + */ +public class MaxComputeMetadataOps implements ExternalMetadataOps { + private static final Logger LOG = LogManager.getLogger(MaxComputeMetadataOps.class); + + private static final long MAX_LIFECYCLE_DAYS = 37231; + private static final int MAX_BUCKET_NUM = 1024; + + private final MaxComputeExternalCatalog dorisCatalog; + private final Odps odps; + + public MaxComputeMetadataOps(MaxComputeExternalCatalog dorisCatalog, Odps odps) { + this.dorisCatalog = dorisCatalog; + this.odps = odps; + } + + @Override + public void close() { + } + + @Override + public boolean tableExist(String dbName, String tblName) { + return dorisCatalog.tableExist(null, dbName, tblName); + } + + @Override + public boolean databaseExist(String dbName) { + return dorisCatalog.getMcStructureHelper().databaseExist(dorisCatalog.getClient(), dbName); + } + + @Override + public List<String> listDatabaseNames() { + return dorisCatalog.listDatabaseNames(); + } + + @Override + public List<String> listTableNames(String dbName) { + return dorisCatalog.listTableNames(null, dbName); + } + + // ==================== Create Database (not supported yet) ==================== + + @Override + public boolean createDbImpl(String dbName, boolean ifNotExists, Map<String, String> properties) + throws DdlException { + throw new DdlException("Create database is not supported for MaxCompute catalog."); + } + + @Override + public void dropDbImpl(String dbName, boolean ifExists, boolean force) throws DdlException { + throw new DdlException("Drop database is not supported for MaxCompute catalog."); + } + + @Override + public void afterDropDb(String dbName) { + } + + // ==================== Create Table ==================== + + @Override + public boolean createTableImpl(CreateTableInfo createTableInfo) throws UserException { + String dbName = createTableInfo.getDbName(); + String tableName = createTableInfo.getTableName(); + + // 1. Validate database existence + ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName); + if (db == null) { + throw new UserException( + "Failed to get database: '" + dbName + "' in catalog: " + dorisCatalog.getName()); + } + + // 2. Check if table exists in remote + if (tableExist(db.getRemoteName(), tableName)) { + if (createTableInfo.isIfNotExists()) { + LOG.info("create table[{}] which already exists", tableName); + return true; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); + } + } + + // 3. Check if table exists in local (case sensitivity issue) + ExternalTable dorisTable = db.getTableNullable(tableName); + if (dorisTable != null) { + if (createTableInfo.isIfNotExists()) { + LOG.info("create table[{}] which already exists", tableName); + return true; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); + } + } + + // 4. Validate columns + List<Column> columns = createTableInfo.getColumns(); + validateColumns(columns); + + // 5. Validate partition description + PartitionDesc partitionDesc = createTableInfo.getPartitionDesc(); + validatePartitionDesc(partitionDesc); + + // 6. Build MaxCompute TableSchema + TableSchema schema = buildMaxComputeTableSchema(columns, partitionDesc); + + // 7. Extract properties + Map<String, String> properties = createTableInfo.getProperties(); + Long lifecycle = extractLifecycle(properties); + Map<String, String> mcProperties = extractMaxComputeProperties(properties); + Integer bucketNum = extractBucketNum(createTableInfo); + + // 8. Create table via MaxCompute SDK + McStructureHelper structureHelper = dorisCatalog.getMcStructureHelper(); + Tables.TableCreator creator = structureHelper.createTableCreator( + odps, db.getRemoteName(), tableName, schema); + + if (createTableInfo.isIfNotExists()) { + creator.ifNotExists(); + } + + String comment = createTableInfo.getComment(); + if (comment != null && !comment.isEmpty()) { + creator.withComment(comment); + } + + if (lifecycle != null) { + creator.withLifeCycle(lifecycle); + } + + if (!mcProperties.isEmpty()) { + creator.withTblProperties(mcProperties); + } + + if (bucketNum != null) { + creator.withBucketNum(bucketNum); + } + + try { + creator.create(); + } catch (OdpsException e) { + throw new DdlException("Failed to create MaxCompute table '" + tableName + "': " + e.getMessage(), e); + } + + return false; + } + + @Override + public void afterCreateTable(String dbName, String tblName) { + Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName); + if (db.isPresent()) { + db.get().resetMetaCacheNames(); + } + LOG.info("after create table {}.{}.{}, is db exists: {}", + dorisCatalog.getName(), dbName, tblName, db.isPresent()); + } + + // ==================== Drop Table (not supported yet) ==================== + + @Override + public void dropTableImpl(ExternalTable dorisTable, boolean ifExists) throws DdlException { + // Get remote names (handles case-sensitivity) + String remoteDbName = dorisTable.getRemoteDbName(); + String remoteTblName = dorisTable.getRemoteName(); + + // Check table existence + if (!tableExist(remoteDbName, remoteTblName)) { + if (ifExists) { + LOG.info("drop table[{}.{}] which does not exist", remoteDbName, remoteTblName); + return; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, + remoteTblName, remoteDbName); + } + } + + // Drop table via McStructureHelper + try { + McStructureHelper structureHelper = dorisCatalog.getMcStructureHelper(); + structureHelper.dropTable(odps, remoteDbName, remoteTblName, ifExists); + LOG.info("Successfully dropped MaxCompute table: {}.{}", remoteDbName, remoteTblName); + } catch (OdpsException e) { + throw new DdlException("Failed to drop MaxCompute table '" + + remoteTblName + "': " + e.getMessage(), e); + } + } + + @Override + public void afterDropTable(String dbName, String tblName) { + Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName); + if (db.isPresent()) { + db.get().unregisterTable(tblName); + } + LOG.info("after drop table {}.{}.{}, is db exists: {}", + dorisCatalog.getName(), dbName, tblName, db.isPresent()); + } + + @Override + public void truncateTableImpl(ExternalTable dorisTable, List<String> partitions) throws DdlException { + throw new DdlException("Truncate table is not supported for MaxCompute catalog."); + } + + // ==================== Branch/Tag (not supported) ==================== + + @Override + public void createOrReplaceBranchImpl(ExternalTable dorisTable, CreateOrReplaceBranchInfo branchInfo) + throws UserException { + throw new UserException("Branch operations are not supported for MaxCompute catalog."); + } + + @Override + public void createOrReplaceTagImpl(ExternalTable dorisTable, CreateOrReplaceTagInfo tagInfo) + throws UserException { + throw new UserException("Tag operations are not supported for MaxCompute catalog."); + } + + @Override + public void dropTagImpl(ExternalTable dorisTable, DropTagInfo tagInfo) throws UserException { + throw new UserException("Tag operations are not supported for MaxCompute catalog."); + } + + @Override + public void dropBranchImpl(ExternalTable dorisTable, DropBranchInfo branchInfo) throws UserException { + throw new UserException("Branch operations are not supported for MaxCompute catalog."); + } + + // ==================== Type Conversion ==================== + + /** + * Convert Doris type to MaxCompute TypeInfo. + */ + public static TypeInfo dorisTypeToMcType(Type dorisType) throws UserException { + if (dorisType.isScalarType()) { + return dorisScalarTypeToMcType(dorisType); + } else if (dorisType.isArrayType()) { + ArrayType arrayType = (ArrayType) dorisType; + TypeInfo elementType = dorisTypeToMcType(arrayType.getItemType()); + return TypeInfoFactory.getArrayTypeInfo(elementType); + } else if (dorisType.isMapType()) { + MapType mapType = (MapType) dorisType; + TypeInfo keyType = dorisTypeToMcType(mapType.getKeyType()); + TypeInfo valueType = dorisTypeToMcType(mapType.getValueType()); + return TypeInfoFactory.getMapTypeInfo(keyType, valueType); + } else if (dorisType.isStructType()) { + StructType structType = (StructType) dorisType; + List<StructField> fields = structType.getFields(); + List<String> fieldNames = new ArrayList<>(fields.size()); + List<TypeInfo> fieldTypes = new ArrayList<>(fields.size()); + for (StructField field : fields) { + fieldNames.add(field.getName()); + fieldTypes.add(dorisTypeToMcType(field.getType())); + } + return TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypes); + } else { + throw new UserException("Unsupported Doris type for MaxCompute: " + dorisType); + } + } + + private static TypeInfo dorisScalarTypeToMcType(Type dorisType) throws UserException { + PrimitiveType primitiveType = dorisType.getPrimitiveType(); + switch (primitiveType) { + case BOOLEAN: + return TypeInfoFactory.BOOLEAN; + case TINYINT: + return TypeInfoFactory.TINYINT; + case SMALLINT: + return TypeInfoFactory.SMALLINT; + case INT: + return TypeInfoFactory.INT; + case BIGINT: + return TypeInfoFactory.BIGINT; + case FLOAT: + return TypeInfoFactory.FLOAT; + case DOUBLE: + return TypeInfoFactory.DOUBLE; + case CHAR: + return TypeInfoFactory.getCharTypeInfo(((ScalarType) dorisType).getLength()); + case VARCHAR: + return TypeInfoFactory.getVarcharTypeInfo(((ScalarType) dorisType).getLength()); + case STRING: + return TypeInfoFactory.STRING; + case DECIMALV2: + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + case DECIMAL256: + return TypeInfoFactory.getDecimalTypeInfo( + ((ScalarType) dorisType).getScalarPrecision(), + ((ScalarType) dorisType).getScalarScale()); + case DATE: + case DATEV2: + return TypeInfoFactory.DATE; + case DATETIME: + case DATETIMEV2: + return TypeInfoFactory.DATETIME; + case LARGEINT: + case HLL: + case BITMAP: + case QUANTILE_STATE: + case AGG_STATE: + case JSONB: + case VARIANT: + case IPV4: + case IPV6: + default: + throw new UserException( + "Unsupported Doris type for MaxCompute: " + primitiveType); + } + } + + // ==================== Validation ==================== + + private void validateColumns(List<Column> columns) throws UserException { + if (columns == null || columns.isEmpty()) { + throw new UserException("Table must have at least one column."); + } + Set<String> columnNames = new HashSet<>(); + for (Column col : columns) { + if (col.isAutoInc()) { + throw new UserException( + "Auto-increment columns are not supported for MaxCompute tables: " + col.getName()); + } + if (col.isAggregated()) { + throw new UserException( + "Aggregation columns are not supported for MaxCompute tables: " + col.getName()); + } + String lowerName = col.getName().toLowerCase(); + if (!columnNames.add(lowerName)) { + throw new UserException("Duplicate column name: " + col.getName()); + } + // Validate that the type is convertible + dorisTypeToMcType(col.getType()); + } + } + + private void validatePartitionDesc(PartitionDesc partitionDesc) throws UserException { + if (partitionDesc == null) { + return; + } + ArrayList<Expr> exprs = partitionDesc.getPartitionExprs(); + if (exprs == null || exprs.isEmpty()) { + return; + } + for (Expr expr : exprs) { + if (expr instanceof SlotRef) { + // Identity partition - OK + } else if (expr instanceof FunctionCallExpr) { + String funcName = ((FunctionCallExpr) expr).getFnName().getFunction(); + throw new UserException( + "MaxCompute does not support partition transform '" + funcName + + "'. Only identity partitions are supported."); + } else { + throw new UserException("Invalid partition expression: " + expr.toSql()); + } + } + } + + // ==================== Schema Building ==================== + + private TableSchema buildMaxComputeTableSchema(List<Column> columns, PartitionDesc partitionDesc) + throws UserException { + Set<String> partitionColNames = new HashSet<>(); + if (partitionDesc != null && partitionDesc.getPartitionColNames() != null) { + for (String name : partitionDesc.getPartitionColNames()) { + partitionColNames.add(name.toLowerCase()); + } + } + + TableSchema schema = new TableSchema(); + + // Add regular columns (non-partition) + for (Column col : columns) { + if (!partitionColNames.contains(col.getName().toLowerCase())) { + TypeInfo mcType = dorisTypeToMcType(col.getType()); + com.aliyun.odps.Column mcCol = new com.aliyun.odps.Column( + col.getName(), mcType, col.getComment()); + schema.addColumn(mcCol); + } + } + + // Add partition columns in the order specified by partitionDesc + if (partitionDesc != null && partitionDesc.getPartitionColNames() != null) { + for (String partColName : partitionDesc.getPartitionColNames()) { + Column col = findColumnByName(columns, partColName); + if (col == null) { + throw new UserException("Partition column '" + partColName + "' not found in column definitions."); + } + TypeInfo mcType = dorisTypeToMcType(col.getType()); + com.aliyun.odps.Column mcCol = new com.aliyun.odps.Column( + col.getName(), mcType, col.getComment()); + schema.addPartitionColumn(mcCol); + } + } + + return schema; + } + + private Column findColumnByName(List<Column> columns, String name) { + for (Column col : columns) { + if (col.getName().equalsIgnoreCase(name)) { + return col; + } + } + return null; + } + + // ==================== Property Extraction ==================== + + private Long extractLifecycle(Map<String, String> properties) throws UserException { + String lifecycleStr = properties.get("mc.lifecycle"); + if (lifecycleStr == null) { + lifecycleStr = properties.get("lifecycle"); + } + if (lifecycleStr != null) { + try { + long lifecycle = Long.parseLong(lifecycleStr); + if (lifecycle <= 0 || lifecycle > MAX_LIFECYCLE_DAYS) { + throw new UserException( + "Invalid lifecycle value: " + lifecycle + + ". Must be between 1 and " + MAX_LIFECYCLE_DAYS + "."); + } + return lifecycle; + } catch (NumberFormatException e) { + throw new UserException("Invalid lifecycle value: '" + lifecycleStr + "'. Must be a positive integer."); + } + } + return null; + } + + private Map<String, String> extractMaxComputeProperties(Map<String, String> properties) { + Map<String, String> mcProperties = new HashMap<>(); + for (Map.Entry<String, String> entry : properties.entrySet()) { + if (entry.getKey().startsWith("mc.tblproperty.")) { + String mcKey = entry.getKey().substring("mc.tblproperty.".length()); + mcProperties.put(mcKey, entry.getValue()); + } + } + return mcProperties; + } + + private Integer extractBucketNum(CreateTableInfo createTableInfo) throws UserException { + DistributionDesc distributionDesc = createTableInfo.getDistributionDesc(); + if (distributionDesc == null) { + return null; + } + if (!(distributionDesc instanceof HashDistributionDesc)) { + throw new UserException( + "MaxCompute only supports hash distribution. Got: " + distributionDesc.getClass().getSimpleName()); + } + + HashDistributionDesc hashDist = (HashDistributionDesc) distributionDesc; + int bucketNum = hashDist.getBuckets(); + + if (bucketNum <= 0 || bucketNum > MAX_BUCKET_NUM) { + throw new UserException( + "Invalid bucket number: " + bucketNum + ". Must be between 1 and " + MAX_BUCKET_NUM + "."); + } + + return bucketNum; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/McStructureHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/McStructureHelper.java index 7232b716426..e1e1090143c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/McStructureHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/McStructureHelper.java @@ -24,6 +24,8 @@ import com.aliyun.odps.Partition; import com.aliyun.odps.Project; import com.aliyun.odps.Schema; import com.aliyun.odps.Table; +import com.aliyun.odps.TableSchema; +import com.aliyun.odps.Tables; import com.aliyun.odps.security.SecurityManager; import com.aliyun.odps.table.TableIdentifier; import com.aliyun.odps.utils.StringUtils; @@ -59,6 +61,10 @@ public interface McStructureHelper { Table getOdpsTable(Odps mcClient, String dbName, String tableName); + Tables.TableCreator createTableCreator(Odps mcClient, String dbName, String tableName, TableSchema schema); + + void dropTable(Odps mcClient, String dbName, String tableName, boolean ifExists) throws OdpsException; + /** * `mc.enable.namespace.schema` = true. * mapping structure between Doris and MaxCompute: @@ -128,6 +134,21 @@ public interface McStructureHelper { public Table getOdpsTable(Odps mcClient, String dbName, String tableName) { return mcClient.tables().get(defaultProjectName, dbName, tableName); } + + @Override + public Tables.TableCreator createTableCreator(Odps mcClient, String dbName, String tableName, + TableSchema schema) { + // dbName is the schema name, defaultProjectName is the project + return mcClient.tables().newTableCreator(defaultProjectName, tableName, schema) + .withSchemaName(dbName); + } + + @Override + public void dropTable(Odps mcClient, String dbName, String tableName, boolean ifExists) + throws OdpsException { + // dbName is the schema name, defaultProjectName is the project + mcClient.tables().delete(defaultProjectName, dbName, tableName, ifExists); + } } /** @@ -211,6 +232,20 @@ public interface McStructureHelper { public Table getOdpsTable(Odps mcClient, String dbName, String tableName) { return mcClient.tables().get(dbName, tableName); } + + @Override + public Tables.TableCreator createTableCreator(Odps mcClient, String dbName, String tableName, + TableSchema schema) { + // dbName is the project name + return mcClient.tables().newTableCreator(dbName, tableName, schema); + } + + @Override + public void dropTable(Odps mcClient, String dbName, String tableName, boolean ifExists) + throws OdpsException { + // dbName is the project name + mcClient.tables().delete(dbName, tableName, ifExists); + } } static McStructureHelper getHelper(boolean isEnableNamespaceSchema, String defaultProjectName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java index 513b3379177..cf0205574e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java @@ -21,8 +21,11 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetadataOps; import org.apache.doris.datasource.iceberg.IcebergMetadataOps; +import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; +import org.apache.doris.datasource.maxcompute.MaxComputeMetadataOps; import org.apache.doris.datasource.paimon.PaimonMetadataOps; +import com.aliyun.odps.Odps; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.catalog.Catalog; @@ -41,4 +44,10 @@ public class ExternalMetadataOperations { org.apache.paimon.catalog.Catalog catalog) { return new PaimonMetadataOps(dorisCatalog, catalog); } + + public static MaxComputeMetadataOps newMaxComputeMetadataOps( + MaxComputeExternalCatalog catalog, Odps odps) { + return new MaxComputeMetadataOps(catalog, odps); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java index 7216262abcb..6943de00bb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java @@ -49,6 +49,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.es.EsUtil; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.CascadesContext; @@ -118,6 +119,7 @@ public class CreateTableInfo { public static final String ENGINE_HIVE = "hive"; public static final String ENGINE_ICEBERG = "iceberg"; public static final String ENGINE_PAIMON = "paimon"; + public static final String ENGINE_MAXCOMPUTE = "maxcompute"; private static final ImmutableSet<AggregateType> GENERATED_COLUMN_ALLOW_AGG_TYPE = ImmutableSet.of(AggregateType.REPLACE, AggregateType.REPLACE_IF_NOT_NULL); @@ -379,6 +381,8 @@ public class CreateTableInfo { throw new AnalysisException("Iceberg type catalog can only use `iceberg` engine."); } else if (catalog instanceof PaimonExternalCatalog && !engineName.equals(ENGINE_PAIMON)) { throw new AnalysisException("Paimon type catalog can only use `paimon` engine."); + } else if (catalog instanceof MaxComputeExternalCatalog && !engineName.equals(ENGINE_MAXCOMPUTE)) { + throw new AnalysisException("MaxCompute type catalog can only use `maxcompute` engine."); } } @@ -876,6 +880,8 @@ public class CreateTableInfo { engineName = ENGINE_ICEBERG; } else if (catalog instanceof PaimonExternalCatalog) { engineName = ENGINE_PAIMON; + } else if (catalog instanceof MaxComputeExternalCatalog) { + engineName = ENGINE_MAXCOMPUTE; } else { throw new AnalysisException("Current catalog does not support create table: " + ctlName); } @@ -906,7 +912,7 @@ public class CreateTableInfo { if (engineName.equals(ENGINE_MYSQL) || engineName.equals(ENGINE_ODBC) || engineName.equals(ENGINE_BROKER) || engineName.equals(ENGINE_ELASTICSEARCH) || engineName.equals(ENGINE_HIVE) || engineName.equals(ENGINE_ICEBERG) || engineName.equals(ENGINE_JDBC) - || engineName.equals(ENGINE_PAIMON)) { + || engineName.equals(ENGINE_PAIMON) || engineName.equals(ENGINE_MAXCOMPUTE)) { if (!isExternal) { // this is for compatibility isExternal = true; @@ -1079,12 +1085,14 @@ public class CreateTableInfo { throw new AnalysisException(e.getMessage(), e.getCause()); } } else if (!engineName.equals(ENGINE_OLAP)) { - if (!engineName.equals(ENGINE_HIVE) && distributionDesc != null) { + if (!engineName.equals(ENGINE_HIVE) && !engineName.equals(ENGINE_MAXCOMPUTE) + && distributionDesc != null) { throw new AnalysisException("Create " + engineName + " table should not contain distribution desc"); } if (!engineName.equals(ENGINE_HIVE) && !engineName.equals(ENGINE_ICEBERG) - && !engineName.equals(ENGINE_PAIMON) && partitionDesc != null) { + && !engineName.equals(ENGINE_PAIMON) && !engineName.equals(ENGINE_MAXCOMPUTE) + && partitionDesc != null) { throw new AnalysisException("Create " + engineName + " table should not contain partition desc"); } diff --git a/regression-test/data/external_table_p2/maxcompute/test_max_compute_create_table.out b/regression-test/data/external_table_p2/maxcompute/test_max_compute_create_table.out new file mode 100644 index 00000000000..9fca96546e8 --- /dev/null +++ b/regression-test/data/external_table_p2/maxcompute/test_max_compute_create_table.out @@ -0,0 +1,34 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !test1_show_create_table -- +test_mc_basic_table CREATE TABLE `test_mc_basic_table` (\n `id` int NULL,\n `name` text NULL,\n `age` int NULL\n) ENGINE=MAX_COMPUTE_EXTERNAL_TABLE; + +-- !test2_show_create_table -- +test_mc_all_types_comprehensive CREATE TABLE `test_mc_all_types_comprehensive` (\n `id` int NULL,\n `bool_col` boolean NULL,\n `tinyint_col` tinyint NULL,\n `smallint_col` smallint NULL,\n `int_col` int NULL,\n `bigint_col` bigint NULL,\n `float_col` float NULL,\n `double_col` double NULL,\n `decimal_col1` decimalv3(9,0) NULL,\n `decimal_col2` decimalv3(8,4) NULL,\n `decimal_col3` decimalv3(18,6) NULL,\n `decimal_col4` decimalv3(38,12) NULL,\n `string_col` text NULL,\n `var [...] + +-- !test3_show_create_table -- +test_mc_partition_table CREATE TABLE `test_mc_partition_table` (\n `id` int NULL,\n `name` text NULL,\n `amount` double NULL,\n `ds` text NULL\n) ENGINE=MAX_COMPUTE_EXTERNAL_TABLE; + +-- !test4_show_create_table -- +test_mc_distributed_table CREATE TABLE `test_mc_distributed_table` (\n `id` int NULL,\n `name` text NULL,\n `value` int NULL\n) ENGINE=MAX_COMPUTE_EXTERNAL_TABLE; + +-- !test5_show_create_table -- +test_mc_partition_distributed_table CREATE TABLE `test_mc_partition_distributed_table` (\n `id` int NULL,\n `name` text NULL,\n `value` int NULL,\n `ds` text NULL\n) ENGINE=MAX_COMPUTE_EXTERNAL_TABLE; + +-- !test6_show_create_table -- +test_mc_table_with_comment CREATE TABLE `test_mc_table_with_comment` (\n `id` int NULL COMMENT "User ID",\n `name` text NULL COMMENT "User Name"\n) ENGINE=MAX_COMPUTE_EXTERNAL_TABLE; + +-- !test8_show_create_table -- +test_mc_table_with_lifecycle CREATE TABLE `test_mc_table_with_lifecycle` (\n `id` int NULL,\n `name` text NULL\n) ENGINE=MAX_COMPUTE_EXTERNAL_TABLE; + +-- !test9_show_create_table -- +test_mc_if_not_exists_table CREATE TABLE `test_mc_if_not_exists_table` (\n `id` int NULL,\n `name` text NULL\n) ENGINE=MAX_COMPUTE_EXTERNAL_TABLE; + +-- !test10_show_create_table -- +test_mc_array_type_table CREATE TABLE `test_mc_array_type_table` (\n `id` int NULL,\n `tags` array<text> NULL,\n `scores` array<int> NULL,\n `values` array<double> NULL\n) ENGINE=MAX_COMPUTE_EXTERNAL_TABLE; + +-- !test11_show_create_table -- +test_mc_map_type_table CREATE TABLE `test_mc_map_type_table` (\n `id` int NULL,\n `properties` map<text,text> NULL,\n `metrics` map<text,double> NULL,\n `config` map<int,boolean> NULL\n) ENGINE=MAX_COMPUTE_EXTERNAL_TABLE; + +-- !test12_show_create_table -- +test_mc_struct_type_table CREATE TABLE `test_mc_struct_type_table` (\n `id` int NULL,\n `person` struct<name:text,age:int,email:text> NULL,\n `address` struct<city:text,country:text,zipcode:int> NULL\n) ENGINE=MAX_COMPUTE_EXTERNAL_TABLE; + diff --git a/regression-test/suites/external_table_p2/maxcompute/test_max_compute_create_table.groovy b/regression-test/suites/external_table_p2/maxcompute/test_max_compute_create_table.groovy new file mode 100644 index 00000000000..df8c37b2e62 --- /dev/null +++ b/regression-test/suites/external_table_p2/maxcompute/test_max_compute_create_table.groovy @@ -0,0 +1,426 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/** + * Regression tests for MaxCompute CREATE TABLE statements + * This test covers: + * 1. Basic create table statements + * 2. Create table with partitions + * 3. Create table with distribution (bucketing) + * 4. Create table with comments and properties + * 5. Create table with various data types + * 6. Error cases (unsupported features, invalid configurations) + * + * Note: Tables are NOT deleted after creation to allow inspection after tests run + */ + +suite("test_max_compute_create_table", "p2,external,maxcompute,external_remote,external_remote_maxcompute") { + String enabled = context.config.otherConfigs.get("enableMaxComputeTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String ak = context.config.otherConfigs.get("ak") + String sk = context.config.otherConfigs.get("sk") + String mc_catalog_name = "test_max_compute_create_table" + String defaultProject = "mc_datalake" + + sql """drop catalog if exists ${mc_catalog_name} """ + + sql """ + CREATE CATALOG IF NOT EXISTS ${mc_catalog_name} PROPERTIES ( + "type" = "max_compute", + "mc.default.project" = "${defaultProject}", + "mc.access_key" = "${ak}", + "mc.secret_key" = "${sk}", + "mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api", + "mc.quota" = "pay-as-you-go" + ); + """ + + logger.info("catalog " + mc_catalog_name + " created") + sql """switch ${mc_catalog_name};""" + logger.info("switched to catalog " + mc_catalog_name) + sql """ show databases; """ + sql """ use ${defaultProject} """ + + // ============================================================================ + // Test 1: Basic CREATE TABLE (Simple table without partition or distribution) + // ============================================================================ + String test1_table = "test_mc_basic_table" + sql """DROP TABLE IF EXISTS ${test1_table}""" + sql """ + CREATE TABLE ${test1_table} ( + id INT, + name STRING, + age INT + ) + """ + sql """show tables like '${test1_table}' """ + qt_test1_show_create_table """SHOW CREATE TABLE ${test1_table} """ + + // ============================================================================ + // Test 2: CREATE TABLE with all supported data types + // Comprehensive type conversion test + // ============================================================================ + String test2_table = "test_mc_all_types_comprehensive" + sql """DROP TABLE IF EXISTS ${test2_table}""" + sql """ + CREATE TABLE ${test2_table} ( + id INT, + bool_col BOOLEAN, + tinyint_col TINYINT, + smallint_col SMALLINT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + decimal_col1 DECIMAL(9,0), + decimal_col2 DECIMAL(8,4), + decimal_col3 DECIMAL(18,6), + decimal_col4 DECIMAL(38,12), + string_col STRING, + varchar_col1 VARCHAR(100), + varchar_col2 VARCHAR(65533), + char_col1 CHAR(50), + char_col2 CHAR(255), + date_col DATE, + datetime_col DATETIME, + t_array_string ARRAY<STRING>, + t_array_int ARRAY<INT>, + t_array_bigint ARRAY<BIGINT>, + t_array_float ARRAY<FLOAT>, + t_array_double ARRAY<DOUBLE>, + t_array_boolean ARRAY<BOOLEAN>, + t_map_string MAP<STRING,STRING>, + t_map_int MAP<INT,INT>, + t_map_bigint MAP<BIGINT,BIGINT>, + t_map_float MAP<FLOAT,FLOAT>, + t_map_double MAP<DOUBLE,DOUBLE>, + t_struct_simple STRUCT<field1:STRING,field2:INT> + ) + """ + sql """show tables like '${test2_table}' """ + qt_test2_show_create_table """SHOW CREATE TABLE ${test2_table} """ + + // ============================================================================ + // Test 3: CREATE TABLE with PARTITION clause + // ============================================================================ + String test3_table = "test_mc_partition_table" + sql """DROP TABLE IF EXISTS ${test3_table}""" + sql """ + CREATE TABLE ${test3_table} ( + id INT, + name STRING, + amount DOUBLE, + ds STRING + ) + PARTITION BY (ds)(); + """ + sql """show tables like '${test3_table}' """ + qt_test3_show_create_table """SHOW CREATE TABLE ${test3_table} """ + + // ============================================================================ + // Test 4: CREATE TABLE with DISTRIBUTION (BUCKETING) + // ============================================================================ + String test4_table = "test_mc_distributed_table" + sql """DROP TABLE IF EXISTS ${test4_table}""" + sql """ + CREATE TABLE ${test4_table} ( + id INT, + name STRING, + value INT + ) + DISTRIBUTED BY HASH(id) BUCKETS 4 + """ + sql """show tables like '${test4_table}' """ + qt_test4_show_create_table """SHOW CREATE TABLE ${test4_table} """ + + // ============================================================================ + // Test 5: CREATE TABLE with PARTITION and DISTRIBUTION + // ============================================================================ + String test5_table = "test_mc_partition_distributed_table" + sql """DROP TABLE IF EXISTS ${test5_table}""" + sql """ + CREATE TABLE ${test5_table} ( + id INT, + name STRING, + value INT, + ds STRING + ) + PARTITION BY (ds)() + DISTRIBUTED BY HASH(id) BUCKETS 8 + """ + sql """show tables like '${test5_table}' """ + qt_test5_show_create_table """SHOW CREATE TABLE ${test5_table} """ + + // ============================================================================ + // Test 6: CREATE TABLE with COMMENT + // ============================================================================ + String test6_table = "test_mc_table_with_comment" + sql """DROP TABLE IF EXISTS ${test6_table}""" + sql """ + CREATE TABLE ${test6_table} ( + id INT COMMENT 'User ID', + name STRING COMMENT 'User Name' + ) + COMMENT 'This is a test table with comments' + """ + sql """show tables like '${test6_table}' """ + qt_test6_show_create_table """SHOW CREATE TABLE ${test6_table} """ + + // ============================================================================ + // Test 8: CREATE TABLE with LIFECYCLE property + // ============================================================================ + String test8_table = "test_mc_table_with_lifecycle" + sql """DROP TABLE IF EXISTS ${test8_table}""" + sql """ + CREATE TABLE ${test8_table} ( + id INT, + name STRING + ) + PROPERTIES ( + "lifecycle" = "30" + ) + """ + sql """show tables like '${test8_table}' """ + qt_test8_show_create_table """SHOW CREATE TABLE ${test8_table} """ + + // ============================================================================ + // Test 9: CREATE TABLE IF NOT EXISTS + // ============================================================================ + String test9_table = "test_mc_if_not_exists_table" + sql """DROP TABLE IF EXISTS ${test9_table}""" + sql """ + CREATE TABLE IF NOT EXISTS ${test9_table} ( + id INT, + name STRING + ) + """ + // Try creating the same table again with IF NOT EXISTS - should not fail + sql """ + CREATE TABLE IF NOT EXISTS ${test9_table} ( + id INT, + name STRING + ) + """ + sql """show tables like '${test9_table}' """ + qt_test9_show_create_table """SHOW CREATE TABLE ${test9_table} """ + + // ============================================================================ + // Test 10: CREATE TABLE with ARRAY type (supported by MaxCompute) + // ============================================================================ + String test10_table = "test_mc_array_type_table" + sql """DROP TABLE IF EXISTS ${test10_table}""" + sql """ + CREATE TABLE ${test10_table} ( + id INT, + tags ARRAY<STRING>, + scores ARRAY<INT>, + `values` ARRAY<DOUBLE> + ) + """ + sql """show tables like '${test10_table}' """ + qt_test10_show_create_table """SHOW CREATE TABLE ${test10_table} """ + + // ============================================================================ + // Test 11: CREATE TABLE with MAP type (supported by MaxCompute) + // ============================================================================ + String test11_table = "test_mc_map_type_table" + sql """DROP TABLE IF EXISTS ${test11_table}""" + sql """ + CREATE TABLE ${test11_table} ( + id INT, + properties MAP<STRING, STRING>, + metrics MAP<STRING, DOUBLE>, + config MAP<INT, BOOLEAN> + ) + """ + sql """show tables like '${test11_table}' """ + qt_test11_show_create_table """SHOW CREATE TABLE ${test11_table} """ + + // ============================================================================ + // Test 12: CREATE TABLE with STRUCT type (supported by MaxCompute) + // ============================================================================ + String test12_table = "test_mc_struct_type_table" + sql """DROP TABLE IF EXISTS ${test12_table}""" + sql """ + CREATE TABLE ${test12_table} ( + id INT, + person STRUCT<name:STRING,age:INT,email:STRING>, + address STRUCT<city:STRING,country:STRING,zipcode:INT> + ) + """ + sql """show tables like '${test12_table}' """ + qt_test12_show_create_table """SHOW CREATE TABLE ${test12_table} """ + + // ============================================================================ + // Test 13: ERROR CASE - CREATE TABLE with unsupported type (IPV4) + // ============================================================================ + String test13_table = "test_mc_unsupported_type_table" + sql """DROP TABLE IF EXISTS ${test13_table}""" + test { + sql """ + CREATE TABLE ${test13_table} ( + id INT, + ip IPV4 + ) + """ + exception "Unsupported" + } + + // ============================================================================ + // Test 14: ERROR CASE - CREATE TABLE with AUTO_INCREMENT column + // ============================================================================ + String test14_table = "test_mc_auto_increment_table" + sql """DROP TABLE IF EXISTS ${test14_table}""" + test { + sql """ + CREATE TABLE ${test14_table} ( + id INT AUTO_INCREMENT, + name STRING + ) + """ + exception "Auto-increment" + } + + // ============================================================================ + // Test 15: ERROR CASE - CREATE TABLE with duplicate column names + // ============================================================================ + String test15_table = "test_mc_duplicate_column_table" + sql """DROP TABLE IF EXISTS ${test15_table}""" + test { + sql """ + CREATE TABLE ${test15_table} ( + id INT, + name STRING, + name STRING + ) + """ + exception "Duplicate" + } + + // ============================================================================ + // Test 18: ERROR CASE - LIFECYCLE with invalid value (too large) + // ============================================================================ + String test18_table = "test_mc_invalid_lifecycle_table" + sql """DROP TABLE IF EXISTS ${test18_table}""" + test { + sql """ + CREATE TABLE ${test18_table} ( + id INT, + name STRING + ) + PROPERTIES ( + "lifecycle" = "99999999" + ) + """ + exception "lifecycle" + } + + // ============================================================================ + // Test 19: Basic DROP TABLE + // ============================================================================ + String test19_table = "test_mc_basic_table" + sql """DROP TABLE IF EXISTS ${test19_table}""" + + // ============================================================================ + // Test 20: DROP TABLE with partitions + // ============================================================================ + String test20_table = "test_mc_partition_table" + sql """DROP TABLE IF EXISTS ${test20_table}""" + + // ============================================================================ + // Test 21: DROP TABLE with distribution + // ============================================================================ + String test21_table = "test_mc_distributed_table" + sql """DROP TABLE IF EXISTS ${test21_table}""" + + // ============================================================================ + // Test 22: DROP TABLE IF EXISTS (existing table) + // ============================================================================ + String test22_table = "test_mc_table_with_comment" + sql """DROP TABLE IF EXISTS ${test22_table}""" + + // ============================================================================ + // Test 23: DROP TABLE IF EXISTS (non-existent table) + // ============================================================================ + String test23_table = "test_mc_nonexistent_table_for_drop" + sql """DROP TABLE IF EXISTS ${test23_table}""" + + // ============================================================================ + // Test 24: DROP complex types table + // ============================================================================ + String test24_table = "test_mc_array_type_table" + sql """DROP TABLE IF EXISTS ${test24_table}""" + + // ============================================================================ + // Test 25: DROP then recreate with same name - Verify cache invalidation + // ============================================================================ + String test25_table = "test_mc_cache_invalidation" + sql """DROP TABLE IF EXISTS ${test25_table}""" + // Create table + sql """ + CREATE TABLE ${test25_table} ( + id INT, + value INT + ) + """ + // Drop table + sql """DROP TABLE ${test25_table}""" + // Recreate with different schema + sql """ + CREATE TABLE ${test25_table} ( + id INT, + name STRING, + value DOUBLE + ) + """ + def schema = sql """SHOW CREATE TABLE ${test25_table}""" + + // ============================================================================ + // Test 29: DROP TABLE without IF EXISTS (non-existent) + // ============================================================================ + String test29_table = "test_mc_definitely_nonexistent_table_12345" + sql """DROP TABLE IF EXISTS ${test29_table}""" + test { + sql """DROP TABLE ${test29_table}""" + exception "Failed to get table" + } + + // ============================================================================ + // Test 30: Double DROP without IF EXISTS + // ============================================================================ + String test30_table = "test_mc_double_drop" + sql """DROP TABLE IF EXISTS ${test30_table}""" + // Create and drop first time + sql """ + CREATE TABLE ${test30_table} ( + id INT + ) + """ + sql """DROP TABLE ${test30_table}""" + // Try to drop again without IF EXISTS - should fail + test { + sql """DROP TABLE ${test30_table}""" + exception "Failed to get table" + } + + // ============================================================================ + // Test 31: Verify dropped table not accessible + // ============================================================================ + String test31_table = "test_mc_struct_type_table" + sql """DROP TABLE IF EXISTS ${test31_table}""" + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
