This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d5a61115ad02ef88aea2eec62d630363326327ec
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sun Mar 15 20:39:44 2026 -0700

    branch-4.1: [feature](paimon) implement create/drop db, create/drop table 
for paimon (#58894) (#61338)
    
    Cherry-pick from #58894
    
    ---------
    
    Co-authored-by: yaoxiao <[email protected]>
    Co-authored-by: yaoxiao <[email protected]>
---
 .../java/org/apache/doris/catalog/TableIf.java     |   2 +
 .../operations/ExternalMetadataOperations.java     |   6 +
 .../paimon/DorisToPaimonTypeVisitor.java           | 109 ++++++
 .../datasource/paimon/PaimonExternalCatalog.java   |  40 +-
 .../doris/datasource/paimon/PaimonMetadataOps.java | 405 +++++++++++++++++++++
 .../plans/commands/info/ColumnDefinition.java      |  12 +
 .../trees/plans/commands/info/CreateTableInfo.java |  21 +-
 .../datasource/paimon/PaimonMetadataOpsTest.java   | 256 +++++++++++++
 .../paimon/test_paimon_table.groovy                | 122 +++++++
 9 files changed, 935 insertions(+), 38 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 2a17198a288..8b6f62ae76a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -485,6 +485,8 @@ public interface TableIf {
                 case ICEBERG:
                 case ICEBERG_EXTERNAL_TABLE:
                     return "iceberg";
+                case PAIMON_EXTERNAL_TABLE:
+                    return "paimon";
                 case DICTIONARY:
                     return "dictionary";
                 case DORIS_EXTERNAL_TABLE:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
index 7d63b18cd13..513b3379177 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
@@ -21,6 +21,7 @@ import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HiveMetadataOps;
 import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
+import org.apache.doris.datasource.paimon.PaimonMetadataOps;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.catalog.Catalog;
@@ -35,4 +36,9 @@ public class ExternalMetadataOperations {
     public static IcebergMetadataOps newIcebergMetadataOps(ExternalCatalog 
dorisCatalog, Catalog catalog) {
         return new IcebergMetadataOps(dorisCatalog, catalog);
     }
+
+    public static PaimonMetadataOps newPaimonMetaOps(ExternalCatalog 
dorisCatalog,
+                                                     
org.apache.paimon.catalog.Catalog catalog) {
+        return new PaimonMetadataOps(dorisCatalog, catalog);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java
new file mode 100644
index 00000000000..aad8106563b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.StructField;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.datasource.DorisTypeVisitor;
+
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.types.VariantType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DorisToPaimonTypeVisitor extends DorisTypeVisitor<DataType> {
+
+    @Override
+    public DataType struct(StructType struct, List<DataType> fieldResults) {
+        List<StructField> fields = struct.getFields();
+        List<DataField> newFields = new ArrayList<>(fields.size());
+        AtomicInteger atomicInteger = new AtomicInteger(-1);
+        for (int i = 0; i < fields.size(); i++) {
+            StructField field = fields.get(i);
+            DataType fieldType = 
fieldResults.get(i).copy(field.getContainsNull());
+            String comment = field.getComment();
+            DataField dataField = new 
DataField(atomicInteger.incrementAndGet(), field.getName(), fieldType, comment);
+            newFields.add(dataField);
+        }
+        return new RowType(newFields);
+    }
+
+    @Override
+    public DataType field(StructField field, DataType typeResult) {
+        return typeResult;
+    }
+
+    @Override
+    public DataType array(ArrayType array, DataType elementResult) {
+        return new 
org.apache.paimon.types.ArrayType(elementResult.copy(array.getContainsNull()));
+    }
+
+    @Override
+    public DataType map(MapType map, DataType keyResult, DataType valueResult) 
{
+        return new org.apache.paimon.types.MapType(keyResult.copy(false),
+            valueResult.copy(map.getIsValueContainsNull()));
+    }
+
+    @Override
+    public DataType atomic(Type atomic) {
+        PrimitiveType primitiveType = atomic.getPrimitiveType();
+        if (primitiveType.equals(PrimitiveType.BOOLEAN)) {
+            return new BooleanType();
+        } else if (primitiveType.equals(PrimitiveType.INT)) {
+            return new IntType();
+        } else if (primitiveType.equals(PrimitiveType.BIGINT)) {
+            return new BigIntType();
+        } else if (primitiveType.equals(PrimitiveType.FLOAT)) {
+            return new FloatType();
+        } else if (primitiveType.equals(PrimitiveType.DOUBLE)) {
+            return new DoubleType();
+        } else if (primitiveType.isCharFamily()) {
+            return new VarCharType(VarCharType.MAX_LENGTH);
+        } else if (primitiveType.equals(PrimitiveType.DATE) || 
primitiveType.equals(PrimitiveType.DATEV2)) {
+            return new DateType();
+        } else if (primitiveType.equals(PrimitiveType.DECIMALV2) || 
primitiveType.isDecimalV3Type()) {
+            return new DecimalType(((ScalarType) atomic).getScalarPrecision(), 
((ScalarType) atomic).getScalarScale());
+        } else if (primitiveType.equals(PrimitiveType.DATETIME) || 
primitiveType.equals(PrimitiveType.DATETIMEV2)) {
+            return new TimestampType();
+        } else if (primitiveType.isVarbinaryType()) {
+            return new VarBinaryType(VarBinaryType.MAX_LENGTH);
+        } else if (primitiveType.isVariantType()) {
+            return new VariantType();
+        }
+        throw new UnsupportedOperationException("Not a supported type: " + 
primitiveType);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index b6a06fd4670..f15309ea0e9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -24,13 +24,13 @@ import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.NameMapping;
 import org.apache.doris.datasource.SessionContext;
 import org.apache.doris.datasource.metacache.CacheSpec;
+import org.apache.doris.datasource.operations.ExternalMetadataOperations;
 import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Catalog.TableNotExistException;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.partition.Partition;
 
@@ -67,6 +67,7 @@ public class PaimonExternalCatalog extends ExternalCatalog {
         catalogType = paimonProperties.getPaimonCatalogType();
         catalog = createCatalog();
         initPreExecutionAuthenticator();
+        metadataOps = ExternalMetadataOperations.newPaimonMetaOps(this, 
catalog);
     }
 
     @Override
@@ -81,49 +82,16 @@ public class PaimonExternalCatalog extends ExternalCatalog {
         return catalogType;
     }
 
-    protected List<String> listDatabaseNames() {
-        try {
-            return executionAuthenticator.execute(() -> new 
ArrayList<>(catalog.listDatabases()));
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to list databases names, 
catalog name: " + getName(), e);
-        }
-    }
-
     @Override
     public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
         makeSureInitialized();
-        try {
-            return executionAuthenticator.execute(() -> {
-                try {
-                    catalog.getTable(Identifier.create(dbName, tblName));
-                    return true;
-                } catch (TableNotExistException e) {
-                    return false;
-                }
-            });
-
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to check table existence, 
catalog name: " + getName()
-                    + "error message is:" + 
ExceptionUtils.getRootCauseMessage(e), e);
-        }
+        return metadataOps.tableExist(dbName, tblName);
     }
 
     @Override
     public List<String> listTableNames(SessionContext ctx, String dbName) {
         makeSureInitialized();
-        try {
-            return executionAuthenticator.execute(() -> {
-                List<String> tableNames = null;
-                try {
-                    tableNames = catalog.listTables(dbName);
-                } catch (Catalog.DatabaseNotExistException e) {
-                    LOG.warn("DatabaseNotExistException", e);
-                }
-                return tableNames;
-            });
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to list table names, catalog 
name: " + getName(), e);
-        }
+        return metadataOps.listTableNames(dbName);
     }
 
     public List<Partition> getPaimonPartitions(NameMapping nameMapping) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java
new file mode 100644
index 00000000000..e6c8177edca
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java
@@ -0,0 +1,405 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.catalog.StructField;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.DorisTypeVisitor;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalDatabase;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.operations.ExternalMetadataOps;
+import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo;
+import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Catalog.DatabaseNotEmptyException;
+import org.apache.paimon.catalog.Catalog.DatabaseNotExistException;
+import org.apache.paimon.catalog.Catalog.TableAlreadyExistException;
+import org.apache.paimon.catalog.Catalog.TableNotExistException;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class PaimonMetadataOps implements ExternalMetadataOps {
+
+    private static final Logger LOG = 
LogManager.getLogger(PaimonMetadataOps.class);
+    protected Catalog catalog;
+    protected ExternalCatalog dorisCatalog;
+    private ExecutionAuthenticator executionAuthenticator;
+    private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
+    private static final String PROP_COMMENT = "comment";
+    private static final String PROP_LOCATION = "location";
+
+    public PaimonMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) {
+        this.dorisCatalog = dorisCatalog;
+        this.catalog = catalog;
+        this.executionAuthenticator = dorisCatalog.getExecutionAuthenticator();
+    }
+
+
+    @Override
+    public boolean createDbImpl(String dbName, boolean ifNotExists, 
Map<String, String> properties)
+            throws DdlException {
+        try {
+            return executionAuthenticator.execute(() -> 
performCreateDb(dbName, ifNotExists, properties));
+        } catch (Exception e) {
+            throw new DdlException("Failed to create database: "
+                + dbName + ": " + Util.getRootCauseMessage(e), e);
+        }
+    }
+
+    private boolean performCreateDb(String dbName, boolean ifNotExists, 
Map<String, String> properties)
+            throws DdlException, Catalog.DatabaseAlreadyExistException {
+        if (databaseExist(dbName)) {
+            if (ifNotExists) {
+                LOG.info("create database[{}] which already exists", dbName);
+                return true;
+            } else {
+                ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, 
dbName);
+            }
+        }
+
+        if (!properties.isEmpty() && dorisCatalog instanceof 
PaimonExternalCatalog) {
+            String catalogType = ((PaimonExternalCatalog) 
dorisCatalog).getCatalogType();
+            if (!PaimonExternalCatalog.PAIMON_HMS.equals(catalogType)) {
+                throw new DdlException(
+                    "Not supported: create database with properties for paimon 
catalog type: " + catalogType);
+            }
+        }
+
+        catalog.createDatabase(dbName, ifNotExists, properties);
+        return false;
+    }
+
+    @Override
+    public void afterCreateDb() {
+        dorisCatalog.resetMetaCacheNames();
+    }
+
+    @Override
+    public void dropDbImpl(String dbName, boolean ifExists, boolean force) 
throws DdlException {
+        try {
+            executionAuthenticator.execute(() -> {
+                performDropDb(dbName, ifExists, force);
+                return null;
+            });
+        } catch (Exception e) {
+            throw new DdlException(
+                "Failed to drop database: " + dbName + ", error message is:" + 
e.getMessage(), e);
+        }
+    }
+
+    private void performDropDb(String dbName, boolean ifExists, boolean force) 
throws DdlException {
+        ExternalDatabase dorisDb = dorisCatalog.getDbNullable(dbName);
+        if (dorisDb == null) {
+            if (ifExists) {
+                LOG.info("drop database[{}] which does not exist", dbName);
+                // Database does not exist and IF EXISTS is specified; treat 
as no-op.
+                return;
+            } else {
+                ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, 
dbName);
+                // ErrorReport.reportDdlException is expected to throw 
DdlException.
+                return;
+            }
+        }
+
+        if (force) {
+            List<String> tableNames = listTableNames(dbName);
+            if (!tableNames.isEmpty()) {
+                LOG.info("drop database[{}] with force, drop all tables, num: 
{}", dbName, tableNames.size());
+            }
+            for (String tableName : tableNames) {
+                performDropTable(dbName, tableName, true);
+            }
+        }
+
+        try {
+            catalog.dropDatabase(dbName, ifExists, force);
+        } catch (DatabaseNotExistException e) {
+            throw new RuntimeException("database " + dbName + " does not 
exist!");
+        } catch (DatabaseNotEmptyException e) {
+            throw new RuntimeException("database " + dbName + " is not empty! 
please check!");
+        }
+    }
+
+    @Override
+    public void afterDropDb(String dbName) {
+        dorisCatalog.unregisterDatabase(dbName);
+    }
+
+    @Override
+    public boolean createTableImpl(CreateTableInfo createTableInfo) throws 
UserException {
+        try {
+            return executionAuthenticator.execute(() -> 
performCreateTable(createTableInfo));
+        } catch (Exception e) {
+            throw new DdlException(
+                "Failed to create table: " + createTableInfo.getTableName() + 
", error message is:" + e.getMessage(),
+                e);
+        }
+    }
+
+    public boolean performCreateTable(CreateTableInfo createTableInfo) throws 
UserException {
+        String dbName = createTableInfo.getDbName();
+        ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
+        if (db == null) {
+            throw new UserException("Failed to get database: '" + dbName + "' 
in catalog: " + dorisCatalog.getName());
+        }
+        String tableName = createTableInfo.getTableName();
+        // 1. first, check if table exist in remote
+        if (tableExist(db.getRemoteName(), tableName)) {
+            if (createTableInfo.isIfNotExists()) {
+                LOG.info("create table[{}] which already exists", tableName);
+                return true;
+            } else {
+                
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+            }
+        }
+
+        // 2. second, check if table exist in local.
+        // This is because case sensibility issue, eg:
+        // 1. lower_case_table_name = 1
+        // 2. create table tbl1;
+        // 3. create table TBL1;  TBL1 does not exist in remote because the 
remote system is case-sensitive.
+        //    but because lower_case_table_name = 1, the table can not be 
created in Doris because it is conflict with
+        //    tbl1
+        ExternalTable dorisTable = db.getTableNullable(tableName);
+        if (dorisTable != null) {
+            if (createTableInfo.isIfNotExists()) {
+                LOG.info("create table[{}] which already exists", tableName);
+                return true;
+            } else {
+                
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
+            }
+        }
+        List<ColumnDefinition> columns = 
createTableInfo.getColumnDefinitions();
+        List<StructField> collect = columns.stream()
+                .map(col -> new StructField(col.getName(), 
col.getType().toCatalogDataType(),
+                    col.getComment(), col.isNullable()))
+                .collect(Collectors.toList());
+        StructType structType = new StructType(new ArrayList<>(collect));
+        Schema schema = toPaimonSchema(structType, 
createTableInfo.getPartitionDesc(), createTableInfo.getProperties());
+        try {
+            catalog.createTable(new Identifier(createTableInfo.getDbName(), 
createTableInfo.getTableName()),
+                    schema, createTableInfo.isIfNotExists());
+        } catch (TableAlreadyExistException | DatabaseNotExistException e) {
+            throw new RuntimeException(e);
+        }
+        return false;
+    }
+
+    private Schema toPaimonSchema(StructType structType, PartitionDesc 
partitionDesc, Map<String, String> properties) {
+        Map<String, String> normalizedProperties = new HashMap<>(properties);
+        normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
+        normalizedProperties.remove(PROP_COMMENT);
+        if (normalizedProperties.containsKey(PROP_LOCATION)) {
+            String path = normalizedProperties.remove(PROP_LOCATION);
+            normalizedProperties.put(CoreOptions.PATH.key(), path);
+        }
+
+        String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
+        List<String> primaryKeys = pkAsString == null ? 
Collections.emptyList() : Arrays.stream(pkAsString.split(","))
+                .map(String::trim)
+                .collect(Collectors.toList());
+        List<String> partitionKeys = partitionDesc == null ? new ArrayList<>() 
: partitionDesc.getPartitionColNames();
+        Schema.Builder schemaBuilder = Schema.newBuilder()
+                .options(normalizedProperties)
+                .primaryKey(primaryKeys)
+                .partitionKeys(partitionKeys)
+                .comment(properties.getOrDefault(PROP_COMMENT, null));
+        for (StructField field : structType.getFields()) {
+            schemaBuilder.column(field.getName(),
+                    
toPaimontype(field.getType()).copy(field.getContainsNull()),
+                    field.getComment());
+        }
+        return schemaBuilder.build();
+    }
+
+    private DataType toPaimontype(Type type) {
+        return DorisTypeVisitor.visit(type, new DorisToPaimonTypeVisitor());
+    }
+
+    @Override
+    public void afterCreateTable(String dbName, String tblName) {
+        Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
+        if (db.isPresent()) {
+            db.get().resetMetaCacheNames();
+        }
+        LOG.info("after create table {}.{}.{}, is db exists: {}",
+                dorisCatalog.getName(), dbName, tblName, db.isPresent());
+    }
+
+    @Override
+    public void dropTableImpl(ExternalTable dorisTable, boolean ifExists) 
throws DdlException {
+        try {
+            executionAuthenticator.execute(() -> {
+                performDropTable(dorisTable.getRemoteDbName(), 
dorisTable.getRemoteName(), ifExists);
+                return null;
+            });
+        } catch (Exception e) {
+            throw new DdlException(
+                "Failed to drop table: " + dorisTable.getName() + ", error 
message is:" + e.getMessage(), e);
+        }
+    }
+
+    private void performDropTable(String dBName, String tableName, boolean 
ifExists) throws DdlException {
+        if (!tableExist(dBName, tableName)) {
+            if (ifExists) {
+                LOG.info("drop table[{}] which does not exist", tableName);
+                return;
+            } else {
+                ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, 
tableName, dBName);
+            }
+        }
+        try {
+            catalog.dropTable(Identifier.create(dBName, tableName), ifExists);
+        } catch (TableNotExistException e) {
+            throw new RuntimeException("table " + tableName + " does not 
exist");
+        }
+    }
+
+    @Override
+    public void afterDropTable(String dbName, String tblName) {
+        Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
+        db.ifPresent(externalDatabase -> 
externalDatabase.unregisterTable(tblName));
+        LOG.info("after drop table {}.{}.{}. is db exists: {}",
+                dorisCatalog.getName(), dbName, tblName, db.isPresent());
+    }
+
+    @Override
+    public void truncateTableImpl(ExternalTable dorisTable, List<String> 
partitions) throws DdlException {
+        throw new UnsupportedOperationException("truncate table is not a 
supported operation!");
+    }
+
+    @Override
+    public void createOrReplaceBranchImpl(ExternalTable dorisTable, 
CreateOrReplaceBranchInfo branchInfo)
+            throws UserException {
+        throw new UnsupportedOperationException("create or replace branch is 
not a supported operation!");
+    }
+
+    @Override
+    public void createOrReplaceTagImpl(ExternalTable dorisTable, 
CreateOrReplaceTagInfo tagInfo) throws UserException {
+        throw new UnsupportedOperationException("create or replace tag is not 
a supported operation!");
+    }
+
+    @Override
+    public void dropTagImpl(ExternalTable dorisTable, DropTagInfo tagInfo) 
throws UserException {
+        throw new UnsupportedOperationException("drop tag is not a supported 
operation!");
+    }
+
+    @Override
+    public void dropBranchImpl(ExternalTable dorisTable, DropBranchInfo 
branchInfo) throws UserException {
+        throw new UnsupportedOperationException("drop branch is not a 
supported operation!");
+    }
+
+    @Override
+    public List<String> listDatabaseNames() {
+        try {
+            return executionAuthenticator.execute(() -> new 
ArrayList<>(catalog.listDatabases()));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list databases names, 
catalog name: " + dorisCatalog.getName(), e);
+        }
+    }
+
+    @Override
+    public List<String> listTableNames(String db) {
+        try {
+            return executionAuthenticator.execute(() -> {
+                List<String> tableNames = new ArrayList<>();
+                try {
+                    tableNames.addAll(catalog.listTables(db));
+                } catch (DatabaseNotExistException e) {
+                    LOG.warn("DatabaseNotExistException", e);
+                }
+                return tableNames;
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list table names, catalog 
name: " + dorisCatalog.getName(), e);
+        }
+    }
+
+    @Override
+    public boolean tableExist(String dbName, String tblName) {
+        try {
+            return executionAuthenticator.execute(() -> {
+                try {
+                    catalog.getTable(Identifier.create(dbName, tblName));
+                    return true;
+                } catch (TableNotExistException e) {
+                    return false;
+                }
+            });
+
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to check table existence, 
catalog name: " + dorisCatalog.getName()
+                + "error message is:" + ExceptionUtils.getRootCauseMessage(e), 
e);
+        }
+    }
+
+    @Override
+    public boolean databaseExist(String dbName) {
+        try {
+            return executionAuthenticator.execute(() -> {
+                try {
+                    catalog.getDatabase(dbName);
+                    return true;
+                } catch (DatabaseNotExistException e) {
+                    return false;
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to check database exist, error 
message is:" + e.getMessage(), e);
+        }
+    }
+
+    public Catalog getCatalog() {
+        return catalog;
+    }
+
+    @Override
+    public void close() {
+        if (catalog != null) {
+            catalog = null;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
index a4fdd7329e2..ee2976f29e9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
@@ -190,6 +190,18 @@ public class ColumnDefinition {
         this.generatedColumnsThatReferToThis = generatedColumnsThatReferToThis;
     }
 
+    public String getComment() {
+        return getComment(false);
+    }
+
+    public String getComment(boolean escapeQuota) {
+        String comment = this.comment == null ? "" : this.comment;
+        if (!escapeQuota) {
+            return comment;
+        }
+        return SqlUtils.escapeQuota(comment);
+    }
+
     /**
      * toSql
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index c897a216268..7216262abcb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -49,6 +49,7 @@ import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.es.EsUtil;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.analyzer.Scope;
@@ -116,6 +117,7 @@ public class CreateTableInfo {
     public static final String ENGINE_BROKER = "broker";
     public static final String ENGINE_HIVE = "hive";
     public static final String ENGINE_ICEBERG = "iceberg";
+    public static final String ENGINE_PAIMON = "paimon";
     private static final ImmutableSet<AggregateType> 
GENERATED_COLUMN_ALLOW_AGG_TYPE =
             ImmutableSet.of(AggregateType.REPLACE, 
AggregateType.REPLACE_IF_NOT_NULL);
 
@@ -375,6 +377,8 @@ public class CreateTableInfo {
             throw new AnalysisException("Hms type catalog can only use `hive` 
engine.");
         } else if (catalog instanceof IcebergExternalCatalog && 
!engineName.equals(ENGINE_ICEBERG)) {
             throw new AnalysisException("Iceberg type catalog can only use 
`iceberg` engine.");
+        } else if (catalog instanceof PaimonExternalCatalog && 
!engineName.equals(ENGINE_PAIMON)) {
+            throw new AnalysisException("Paimon type catalog can only use 
`paimon` engine.");
         }
     }
 
@@ -769,7 +773,12 @@ public class CreateTableInfo {
                 throw new AnalysisException(
                     "Iceberg doesn't support 'DISTRIBUTE BY', "
                         + "and you can use 'bucket(num, column)' in 
'PARTITIONED BY'.");
+            } else if (engineName.equalsIgnoreCase(ENGINE_PAIMON) && 
distribution != null) {
+                throw new AnalysisException(
+                    "Paimon doesn't support 'DISTRIBUTE BY', "
+                        + "and you can use 'bucket(num, column)' in 
'PARTITIONED BY'.");
             }
+
             for (ColumnDefinition columnDef : columns) {
                 if (!columnDef.isNullable()
                         && engineName.equalsIgnoreCase(ENGINE_HIVE)) {
@@ -865,6 +874,8 @@ public class CreateTableInfo {
                 engineName = ENGINE_HIVE;
             } else if (catalog instanceof IcebergExternalCatalog) {
                 engineName = ENGINE_ICEBERG;
+            } else if (catalog instanceof PaimonExternalCatalog) {
+                engineName = ENGINE_PAIMON;
             } else {
                 throw new AnalysisException("Current catalog does not support 
create table: " + ctlName);
             }
@@ -894,7 +905,8 @@ public class CreateTableInfo {
     private void checkEngineName() {
         if (engineName.equals(ENGINE_MYSQL) || engineName.equals(ENGINE_ODBC) 
|| engineName.equals(ENGINE_BROKER)
                 || engineName.equals(ENGINE_ELASTICSEARCH) || 
engineName.equals(ENGINE_HIVE)
-                || engineName.equals(ENGINE_ICEBERG) || 
engineName.equals(ENGINE_JDBC)) {
+                || engineName.equals(ENGINE_ICEBERG) || 
engineName.equals(ENGINE_JDBC)
+                || engineName.equals(ENGINE_PAIMON)) {
             if (!isExternal) {
                 // this is for compatibility
                 isExternal = true;
@@ -1071,7 +1083,8 @@ public class CreateTableInfo {
                 throw new AnalysisException("Create " + engineName
                     + " table should not contain distribution desc");
             }
-            if (!engineName.equals(ENGINE_HIVE) && 
!engineName.equals(ENGINE_ICEBERG) && partitionDesc != null) {
+            if (!engineName.equals(ENGINE_HIVE) && 
!engineName.equals(ENGINE_ICEBERG)
+                    && !engineName.equals(ENGINE_PAIMON) && partitionDesc != 
null) {
                 throw new AnalysisException("Create " + engineName
                     + " table should not contain partition desc");
             }
@@ -1301,6 +1314,10 @@ public class CreateTableInfo {
         return partitionDesc;
     }
 
+    public List<ColumnDefinition> getColumnDefinitions() {
+        return columns;
+    }
+
     public List<Column> getColumns() {
         return columns.stream()
             
.map(ColumnDefinition::translateToCatalogStyle).collect(Collectors.toList());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java
new file mode 100644
index 00000000000..e4146faa690
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.CatalogFactory;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand;
+import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Maps;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarCharType;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+public class PaimonMetadataOpsTest {
+    public static String warehouse;
+    public static PaimonExternalCatalog paimonCatalog;
+    public static PaimonMetadataOps ops;
+    public static String dbName = "test_db";
+    public static ConnectContext connectContext;
+
+    @BeforeClass
+    public static void beforeClass() throws Throwable {
+        Path warehousePath = Files.createTempDirectory("test_warehouse_");
+        warehouse = "file://" + warehousePath.toAbsolutePath() + "/";
+        HashMap<String, String> param = new HashMap<>();
+        param.put("type", "paimon");
+        param.put("paimon.catalog.type", "filesystem");
+        param.put("warehouse", warehouse);
+        // create catalog
+        CreateCatalogCommand createCatalogCommand = new 
CreateCatalogCommand("paimon", true, "", "comment", param);
+        paimonCatalog = (PaimonExternalCatalog) 
CatalogFactory.createFromCommand(1, createCatalogCommand);
+        paimonCatalog.makeSureInitialized();
+        // create db
+        ops = new PaimonMetadataOps(paimonCatalog, paimonCatalog.catalog);
+        ops.createDb(dbName, true, Maps.newHashMap());
+        paimonCatalog.makeSureInitialized();
+
+        // context
+        connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
+    }
+
+    @Test
+    public void testSimpleTable() throws Exception {
+        String tableName = getTableName();
+        Identifier identifier = new Identifier(dbName, tableName);
+        String sql = "create table " + dbName + "." + tableName + " (id int) 
engine = paimon";
+        createTable(sql);
+        Catalog catalog = ops.getCatalog();
+        Table table = catalog.getTable(identifier);
+        List<String> columnNames = new ArrayList<>();
+        if (catalog instanceof HiveCatalog) {
+            columnNames.addAll(((HiveCatalog) 
catalog).loadTableSchema(identifier).fieldNames());
+        } else if (catalog instanceof FileSystemCatalog) {
+            columnNames.addAll(((FileSystemCatalog) 
catalog).loadTableSchema(identifier).fieldNames());
+        }
+
+        if (!columnNames.isEmpty()) {
+            Assert.assertEquals(1, columnNames.size());
+        }
+        Assert.assertEquals(0, table.partitionKeys().size());
+    }
+
+    @Test
+    public void testProperties() throws Exception {
+        String tableName = getTableName();
+        Identifier identifier = new Identifier(dbName, tableName);
+        String sql = "create table " + dbName + "." + tableName + " (id int) 
engine = paimon properties(\"primary-key\"=id)";
+        createTable(sql);
+        Catalog catalog = ops.getCatalog();
+        Table table = catalog.getTable(identifier);
+
+        List<String> columnNames = new ArrayList<>();
+        if (catalog instanceof HiveCatalog) {
+            columnNames.addAll(((HiveCatalog) 
catalog).loadTableSchema(identifier).fieldNames());
+        } else if (catalog instanceof FileSystemCatalog) {
+            columnNames.addAll(((FileSystemCatalog) 
catalog).loadTableSchema(identifier).fieldNames());
+        }
+
+        if (!columnNames.isEmpty()) {
+            Assert.assertEquals(1, columnNames.size());
+        }
+        Assert.assertEquals(0, table.partitionKeys().size());
+        Assert.assertTrue(table.primaryKeys().contains("id"));
+        Assert.assertEquals(1, table.primaryKeys().size());
+    }
+
+    @Test
+    public void testType() throws Exception {
+        String tableName = getTableName();
+        Identifier identifier = new Identifier(dbName, tableName);
+        String sql = "create table " + dbName + "." + tableName + " ("
+                + "c0 int, "
+                + "c1 bigint, "
+                + "c2 float, "
+                + "c3 double, "
+                + "c4 string, "
+                + "c5 date, "
+                + "c6 decimal(20, 10), "
+                + "c7 datetime"
+                + ") engine = paimon "
+                + "properties(\"primary-key\"=c0)";
+        createTable(sql);
+        Catalog catalog = ops.getCatalog();
+        Table table = catalog.getTable(identifier);
+
+        List<DataField> columns = new ArrayList<>();
+        if (catalog instanceof HiveCatalog) {
+            columns.addAll(((HiveCatalog) 
catalog).loadTableSchema(identifier).fields());
+        } else if (catalog instanceof FileSystemCatalog) {
+            columns.addAll(((FileSystemCatalog) 
catalog).loadTableSchema(identifier).fields());
+        }
+
+        if (!columns.isEmpty()) {
+            Assert.assertEquals(8, columns.size());
+            Assert.assertEquals(new IntType().asSQLString(), 
columns.get(0).type().toString());
+            Assert.assertEquals(new BigIntType().asSQLString(), 
columns.get(1).type().toString());
+            Assert.assertEquals(new FloatType().asSQLString(), 
columns.get(2).type().toString());
+            Assert.assertEquals(new DoubleType().asSQLString(), 
columns.get(3).type().toString());
+            Assert.assertEquals(new 
VarCharType(VarCharType.MAX_LENGTH).asSQLString(), 
columns.get(4).type().toString());
+            Assert.assertEquals(new DateType().asSQLString(), 
columns.get(5).type().toString());
+            Assert.assertEquals(new DecimalType(20, 10).asSQLString(), 
columns.get(6).type().toString());
+            Assert.assertEquals(new TimestampType().asSQLString(), 
columns.get(7).type().toString());
+        }
+
+        Assert.assertEquals(0, table.partitionKeys().size());
+        Assert.assertTrue(table.primaryKeys().contains("c0"));
+        Assert.assertEquals(1, table.primaryKeys().size());
+    }
+
+    @Test
+    public void testPartition() throws Exception {
+        String tableName = "test04";
+        Identifier identifier = new Identifier(dbName, tableName);
+        String sql = "create table " + dbName + "." + tableName + " ("
+                + "c0 int, "
+                + "c1 bigint, "
+                + "c2 float, "
+                + "c3 double, "
+                + "c4 string, "
+                + "c5 date, "
+                + "c6 decimal(20, 10), "
+                + "c7 datetime"
+                + ") engine = paimon "
+                + "partition by ("
+                + "c1 ) ()"
+                + "properties(\"primary-key\"=c0)";
+        createTable(sql);
+        Catalog catalog = ops.getCatalog();
+        Table table = catalog.getTable(identifier);
+        Assert.assertEquals(1, table.partitionKeys().size());
+        Assert.assertTrue(table.primaryKeys().contains("c0"));
+        Assert.assertEquals(1, table.primaryKeys().size());
+    }
+
+    @Test
+    public void testBucket() throws Exception {
+        String tableName = getTableName();
+        Identifier identifier = new Identifier(dbName, tableName);
+        String sql = "create table " + dbName + "." + tableName + " ("
+                + "c0 int, "
+                + "c1 bigint, "
+                + "c2 float, "
+                + "c3 double, "
+                + "c4 string, "
+                + "c5 date, "
+                + "c6 decimal(20, 10), "
+                + "c7 datetime"
+                + ") engine = paimon "
+                + "properties(\"primary-key\"=c0,"
+                + "\"bucket\" = 4,"
+                + "\"bucket-key\" = c0)";
+        createTable(sql);
+        Catalog catalog = ops.getCatalog();
+        Table table = catalog.getTable(identifier);
+        Assert.assertEquals("4", table.options().get("bucket"));
+        Assert.assertEquals("c0", table.options().get("bucket-key"));
+    }
+
+    public void createTable(String sql) throws UserException {
+        LogicalPlan plan = new NereidsParser().parseSingle(sql);
+        Assertions.assertTrue(plan instanceof CreateTableCommand);
+        CreateTableInfo createTableInfo = ((CreateTableCommand) 
plan).getCreateTableInfo();
+        createTableInfo.setIsExternal(true);
+        createTableInfo.analyzeEngine();
+        ops.createTable(createTableInfo);
+    }
+
+    public String getTableName() {
+        String s = "test_tb_" + UUID.randomUUID();
+        return s.replaceAll("-", "");
+    }
+
+    @Test
+    public void testDropDB() {
+        try {
+            // create db success
+            ops.createDb("t_paimon", false, Maps.newHashMap());
+            // drop db success
+            ops.dropDb("t_paimon", false, false);
+        } catch (Throwable t) {
+            Assert.fail();
+        }
+
+        try {
+            ops.dropDb("t_paimon", false, false);
+            Assert.fail();
+        } catch (Throwable t) {
+            Assert.assertTrue(t instanceof DdlException);
+            Assert.assertTrue(t.getMessage().contains("database doesn't 
exist"));
+        }
+    }
+}
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy 
b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy
new file mode 100644
index 00000000000..032176bd103
--- /dev/null
+++ b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_create_paimon_table", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String catalog_name = "paimon_hms_catalog_test01"
+
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        for (String hivePrefix : ["hive2"]) {
+            String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
+            String hmsPort = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+            String hdfs_port = context.config.otherConfigs.get(hivePrefix + 
"HdfsPort")
+            String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}"
+            String warehouse = "${default_fs}/warehouse"
+
+            // 1. test create catalog
+            sql """drop catalog if exists ${catalog_name};"""
+            sql """
+            create catalog ${catalog_name} properties (
+                'type'='paimon',
+                'paimon.catalog.type'='hms',
+                'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}',
+                'warehouse' = '${warehouse}'
+            );
+            """
+
+            // 2. test create database
+            sql """switch ${catalog_name}"""
+            String db_name = "test_db"
+            sql """create database if not exists ${db_name}"""
+
+            // 3. test create table
+            sql """use ${db_name}"""
+            sql """drop table if exists ${db_name}.test01"""
+            sql """
+                CREATE TABLE ${db_name}.test01 (
+                    id int
+                ) engine=paimon;
+            """
+
+            sql """drop table if exists ${db_name}.test02"""
+            sql """
+                CREATE TABLE ${db_name}.test02 (
+                    id int
+                ) engine=paimon
+                properties("primary-key"=id);
+            """
+
+            sql """drop table if exists ${db_name}.test03"""
+            sql """
+                CREATE TABLE ${db_name}.test03 (
+                    c0 int,
+                    c1 bigint,
+                    c2 float,
+                    c3 double,
+                    c4 string,
+                    c5 date,
+                    c6 decimal(10,5),
+                    c7 datetime
+                ) engine=paimon
+                properties("primary-key"=c0);
+            """
+
+            sql """drop table if exists ${db_name}.test04"""
+            sql """
+                CREATE TABLE ${db_name}.test04 (
+                    c0 int,
+                    c1 bigint,
+                    c2 float,
+                    c3 double,
+                    c4 string,
+                    c5 date,
+                    c6 decimal(10,5),
+                    c7 datetime
+                ) engine=paimon
+                partition by (c1) ()
+                properties("primary-key"=c0);
+            """
+            
+            sql """drop table if exists ${db_name}.test05"""
+            sql """
+                CREATE TABLE ${db_name}.test05 (
+                    c0 int,
+                    c1 bigint,
+                    c2 float,
+                    c3 double,
+                    c4 string,
+                    c5 date,
+                    c6 decimal(10,5),
+                    c7 datetime
+                ) engine=paimon
+                properties(
+                 'primary-key' = 'c0,c1',
+                 'bucket' = '4',
+                 'bucket-key' = 'c0,c1');
+            """
+
+            sql """ drop table if exists ${db_name}.test01"""
+            sql """ drop table if exists ${db_name}.test02"""
+            sql """ drop table if exists ${db_name}.test03"""
+            sql """ drop table if exists ${db_name}.test04"""
+            sql """ drop table if exists ${db_name}.test05"""
+            sql """ drop database if exists ${db_name}"""
+            sql """DROP CATALOG IF EXISTS ${catalog_name}"""
+        }
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to