This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-1.1
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-1.1 by this push:
     new 75b7ce4d52 [Cherry-pick to branch-1.1] [#10281] refactor: Revert 
unnecessary change (#10288) (#10439)
75b7ce4d52 is described below

commit 75b7ce4d5258e9df9b8e91680b0716be40af8e36
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 16 14:17:24 2026 +0800

    [Cherry-pick to branch-1.1] [#10281] refactor: Revert unnecessary change 
(#10288) (#10439)
    
    **Cherry-pick Information:**
    - Original commit: 891ab2d655692a8169b507cdee16598f3fdb6aa3
    - Target branch: `branch-1.1`
    - Status: ⚠️ **Has conflicts - manual resolution required**
    
    Please review and resolve the conflicts before merging.
    
    Co-authored-by: roryqi <[email protected]>
---
 .../integration/test/CatalogClickHouseIT.java      | 1634 +++++++-------------
 .../mysql/integration/test/CatalogMysqlIT.java     |    2 +-
 .../integration/test/CatalogPostgreSqlIT.java      |    3 +-
 .../integration/test/CatalogIcebergBaseIT.java     |    3 +-
 .../gravitino/server/web/rest/TableOperations.java |    9 -
 .../gravitino/server/web/rest/TopicOperations.java |    9 -
 .../server/web/rest/TestTableOperations.java       |    1 -
 .../server/web/rest/TestTopicOperations.java       |    1 -
 8 files changed, 599 insertions(+), 1063 deletions(-)

diff --git 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseIT.java
similarity index 53%
copy from 
catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
copy to 
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseIT.java
index 868c0b9425..aeb2ed6c07 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
+++ 
b/catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseIT.java
@@ -16,10 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.mysql.integration.test;
+package org.apache.gravitino.catalog.clickhouse.integration.test;
 
-import static 
org.apache.gravitino.catalog.mysql.MysqlTablePropertiesMetadata.GRAVITINO_ENGINE_KEY;
-import static 
org.apache.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE.MERGETREE;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.GRAVITINO_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseUtils.getSortOrders;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import com.google.common.collect.ImmutableMap;
@@ -27,6 +29,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.sql.SQLException;
+import java.time.LocalDate;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -42,31 +45,31 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.SupportsSchemas;
 import org.apache.gravitino.auth.AuthConstants;
+import 
org.apache.gravitino.catalog.clickhouse.integration.test.service.ClickHouseService;
 import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
-import 
org.apache.gravitino.catalog.mysql.integration.test.service.MysqlService;
 import org.apache.gravitino.client.GravitinoMetalake;
-import org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
-import org.apache.gravitino.exceptions.NonEmptyCatalogException;
 import org.apache.gravitino.exceptions.NotFoundException;
-import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.integration.test.container.ClickHouseContainer;
 import org.apache.gravitino.integration.test.container.ContainerSuite;
-import org.apache.gravitino.integration.test.container.MySQLContainer;
 import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
 import org.apache.gravitino.integration.test.util.ITUtils;
 import org.apache.gravitino.integration.test.util.TestDatabaseName;
 import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.Column.ColumnImpl;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.FunctionExpression;
+import 
org.apache.gravitino.rel.expressions.FunctionExpression.FuncExpressionImpl;
+import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.UnparsedExpression;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortDirection;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.indexes.Index;
@@ -86,75 +89,65 @@ import org.junit.jupiter.api.condition.EnabledIf;
 
 @Tag("gravitino-docker-test")
 @TestInstance(Lifecycle.PER_CLASS)
-public class CatalogMysqlIT extends BaseIT {
+public class CatalogClickHouseIT extends BaseIT {
   private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
-  private static final String provider = "jdbc-mysql";
+  private static final String provider = "jdbc-clickhouse";
 
-  public String metalakeName = 
GravitinoITUtils.genRandomName("mysql_it_metalake");
-  public String catalogName = 
GravitinoITUtils.genRandomName("mysql_it_catalog");
-  public String schemaName = GravitinoITUtils.genRandomName("mysql_it_schema");
-  public String tableName = GravitinoITUtils.genRandomName("mysql_it_table");
+  public String metalakeName = 
GravitinoITUtils.genRandomName("clickhouse_it_metalake");
+  public String catalogName = 
GravitinoITUtils.genRandomName("clickhouse_it_catalog");
+  public String schemaName = 
GravitinoITUtils.genRandomName("clickhouse_it_schema");
+  public String tableName = 
GravitinoITUtils.genRandomName("clickhouse_it_table");
   public String alertTableName = "alert_table_name";
   public String table_comment = "table_comment";
 
-  // MySQL doesn't support schema comment
-  public String schema_comment = null;
-  public String MYSQL_COL_NAME1 = "mysql_col_name1";
-  public String MYSQL_COL_NAME2 = "mysql_col_name2";
-  public String MYSQL_COL_NAME3 = "mysql_col_name3";
-  public String MYSQL_COL_NAME4 = "mysql_col_name4";
-  public String MYSQL_COL_NAME5 = "mysql_col_name5";
+  public String schema_comment = "test schema";
+  public String CLICKHOUSE_COL_NAME1 = "clickhouse_col_name1";
+  public String CLICKHOUSE_COL_NAME2 = "clickhouse_col_name2";
+  public String CLICKHOUSE_COL_NAME3 = "clickhouse_col_name3";
+  public String CLICKHOUSE_COL_NAME4 = "clickhouse_col_name4";
+  public String CLICKHOUSE_COL_NAME5 = "clickhouse_col_name5";
 
   private GravitinoMetalake metalake;
 
   protected Catalog catalog;
 
-  private MysqlService mysqlService;
+  private ClickHouseService clickhouseService;
 
-  private MySQLContainer MYSQL_CONTAINER;
+  private ClickHouseContainer CLICKHOUSE_CONTAINER;
 
   private TestDatabaseName TEST_DB_NAME;
 
-  public static final String defaultMysqlImageName = "mysql:8.0";
-
-  protected String mysqlImageName = defaultMysqlImageName;
-
-  boolean SupportColumnDefaultValueExpression() {
+  boolean supportColumnDefaultValueExpression() {
     return true;
   }
 
   @BeforeAll
   public void startup() throws IOException, SQLException {
-    TEST_DB_NAME = TestDatabaseName.MYSQL_CATALOG_MYSQL_IT;
-
-    if (mysqlImageName.equals("mysql:5.7")) {
-      
containerSuite.startMySQLVersion5Container(TestDatabaseName.MYSQL_CATALOG_MYSQL_IT);
-      MYSQL_CONTAINER = containerSuite.getMySQLVersion5Container();
-    } else {
-      containerSuite.startMySQLContainer(TEST_DB_NAME);
-      MYSQL_CONTAINER = containerSuite.getMySQLContainer();
-    }
+    TEST_DB_NAME = TestDatabaseName.CLICKHOUSE_CATALOG_CLICKHOUSE_IT;
+
+    containerSuite.startClickHouseContainer(TEST_DB_NAME);
+    CLICKHOUSE_CONTAINER = containerSuite.getClickHouseContainer();
 
-    mysqlService = new MysqlService(MYSQL_CONTAINER, TEST_DB_NAME);
+    clickhouseService = new ClickHouseService(CLICKHOUSE_CONTAINER, 
TEST_DB_NAME);
     createMetalake();
-    catalog = createCatalog(catalogName);
-    createSchema(catalog, schemaName);
+    createCatalog();
+    createSchema();
   }
 
   @AfterAll
   public void stop() {
     clearTableAndSchema();
     metalake.disableCatalog(catalogName);
-    metalake.dropCatalog(catalogName);
+    metalake.dropCatalog(catalogName, true);
     client.disableMetalake(metalakeName);
-    client.dropMetalake(metalakeName);
-    mysqlService.close();
+    client.dropMetalake(metalakeName, true);
+    clickhouseService.close();
   }
 
   @AfterEach
   public void resetSchema() {
     clearTableAndSchema();
-    createSchema(catalog, schemaName);
+    createSchema();
   }
 
   private void clearTableAndSchema() {
@@ -177,20 +170,19 @@ public class CatalogMysqlIT extends BaseIT {
     metalake = loadMetalake;
   }
 
-  private Catalog createCatalog(String catalogName) throws SQLException {
+  private void createCatalog() throws SQLException {
     Map<String, String> catalogProperties = Maps.newHashMap();
 
     catalogProperties.put(
         JdbcConfig.JDBC_URL.getKey(),
         StringUtils.substring(
-                MYSQL_CONTAINER.getJdbcUrl(TEST_DB_NAME),
-                0,
-                MYSQL_CONTAINER.getJdbcUrl(TEST_DB_NAME).lastIndexOf("/"))
-            + "?useSSL=false&allowPublicKeyRetrieval=true");
+            CLICKHOUSE_CONTAINER.getJdbcUrl(TEST_DB_NAME),
+            0,
+            CLICKHOUSE_CONTAINER.getJdbcUrl(TEST_DB_NAME).lastIndexOf("/")));
     catalogProperties.put(
-        JdbcConfig.JDBC_DRIVER.getKey(), 
MYSQL_CONTAINER.getDriverClassName(TEST_DB_NAME));
-    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
MYSQL_CONTAINER.getUsername());
-    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), 
MYSQL_CONTAINER.getPassword());
+        JdbcConfig.JDBC_DRIVER.getKey(), 
CLICKHOUSE_CONTAINER.getDriverClassName(TEST_DB_NAME));
+    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
CLICKHOUSE_CONTAINER.getUsername());
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), 
CLICKHOUSE_CONTAINER.getPassword());
 
     Catalog createdCatalog =
         metalake.createCatalog(
@@ -198,22 +190,34 @@ public class CatalogMysqlIT extends BaseIT {
     Catalog loadCatalog = metalake.loadCatalog(catalogName);
     Assertions.assertEquals(createdCatalog, loadCatalog);
 
-    return loadCatalog;
+    catalog = loadCatalog;
   }
 
-  private void createSchema(Catalog catalog, String schemaName) {
+  private void createSchema() {
     Map<String, String> prop = Maps.newHashMap();
+    Schema createdSchema = null;
+    try {
+      createdSchema = catalog.asSchemas().createSchema(schemaName, 
schema_comment, prop);
+    } catch (Exception ex) {
+      throw new RuntimeException("Create schema failed: " + ex.getMessage(), 
ex);
+    }
 
-    Schema createdSchema = catalog.asSchemas().createSchema(schemaName, 
schema_comment, prop);
     Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
     Assertions.assertEquals(createdSchema.name(), loadSchema.name());
     prop.forEach((key, value) -> 
Assertions.assertEquals(loadSchema.properties().get(key), value));
   }
 
   private Column[] createColumns() {
-    Column col1 = Column.of(MYSQL_COL_NAME1, Types.IntegerType.get(), 
"col_1_comment");
-    Column col2 = Column.of(MYSQL_COL_NAME2, Types.DateType.get(), 
"col_2_comment");
-    Column col3 = Column.of(MYSQL_COL_NAME3, Types.StringType.get(), 
"col_3_comment");
+    Column col1 = Column.of(CLICKHOUSE_COL_NAME1, Types.IntegerType.get(), 
"col_1_comment");
+    Column col2 = Column.of(CLICKHOUSE_COL_NAME2, Types.DateType.get(), 
"col_2_comment");
+    Column col3 =
+        Column.of(
+            CLICKHOUSE_COL_NAME3,
+            Types.StringType.get(),
+            "col_3_comment",
+            false,
+            false,
+            DEFAULT_VALUE_NOT_SET);
 
     return new Column[] {col1, col2, col3};
   }
@@ -221,30 +225,35 @@ public class CatalogMysqlIT extends BaseIT {
   private Column[] createColumnsWithDefaultValue() {
     return new Column[] {
       Column.of(
-          MYSQL_COL_NAME1,
+          CLICKHOUSE_COL_NAME1,
           Types.FloatType.get(),
           "col_1_comment",
           false,
           false,
           Literals.of("1.23", Types.FloatType.get())),
       Column.of(
-          MYSQL_COL_NAME2,
+          CLICKHOUSE_COL_NAME2,
           Types.TimestampType.withoutTimeZone(),
           "col_2_comment",
           false,
           false,
-          FunctionExpression.of("current_timestamp")),
+          FunctionExpression.of("now")),
       Column.of(
-          MYSQL_COL_NAME3, Types.VarCharType.of(255), "col_3_comment", true, 
false, Literals.NULL),
+          CLICKHOUSE_COL_NAME3,
+          Types.VarCharType.of(255),
+          "col_3_comment",
+          true,
+          false,
+          Literals.NULL),
       Column.of(
-          MYSQL_COL_NAME4,
+          CLICKHOUSE_COL_NAME4,
           Types.IntegerType.get(),
           "col_4_comment",
           false,
           false,
           Literals.of("1000", Types.IntegerType.get())),
       Column.of(
-          MYSQL_COL_NAME5,
+          CLICKHOUSE_COL_NAME5,
           Types.DecimalType.of(3, 2),
           "col_5_comment",
           true,
@@ -255,60 +264,12 @@ public class CatalogMysqlIT extends BaseIT {
 
   private Map<String, String> createProperties() {
     Map<String, String> properties = Maps.newHashMap();
-    properties.put(GRAVITINO_ENGINE_KEY, "InnoDB");
+    properties.put(GRAVITINO_ENGINE_KEY, MERGETREE.getValue());
     return properties;
   }
 
   @Test
-  void testDropCatalog() throws SQLException {
-    // test drop catalog with legacy entity
-    String catalogName = GravitinoITUtils.genRandomName("drop_catalog_it");
-    Catalog catalog = createCatalog(catalogName);
-    String schemaName = GravitinoITUtils.genRandomName("drop_catalog_it");
-    createSchema(catalog, schemaName);
-
-    metalake.disableCatalog(catalogName);
-    Assertions.assertThrows(
-        NonEmptyCatalogException.class, () -> 
metalake.dropCatalog(catalogName));
-
-    // drop database externally
-    String sql = String.format("DROP DATABASE %s", schemaName);
-    mysqlService.executeQuery(sql);
-
-    Assertions.assertTrue(metalake.dropCatalog(catalogName));
-  }
-
-  @Test
-  void testTestConnection() throws SQLException {
-    Map<String, String> catalogProperties = Maps.newHashMap();
-
-    catalogProperties.put(
-        JdbcConfig.JDBC_URL.getKey(),
-        StringUtils.substring(
-                MYSQL_CONTAINER.getJdbcUrl(TEST_DB_NAME),
-                0,
-                MYSQL_CONTAINER.getJdbcUrl(TEST_DB_NAME).lastIndexOf("/"))
-            + "?useSSL=false&allowPublicKeyRetrieval=true");
-    catalogProperties.put(
-        JdbcConfig.JDBC_DRIVER.getKey(), 
MYSQL_CONTAINER.getDriverClassName(TEST_DB_NAME));
-    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
MYSQL_CONTAINER.getUsername());
-    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), "wrong_password");
-
-    Exception exception =
-        assertThrows(
-            ConnectionFailedException.class,
-            () ->
-                metalake.testConnection(
-                    GravitinoITUtils.genRandomName("mysql_it_catalog"),
-                    Catalog.Type.RELATIONAL,
-                    provider,
-                    "comment",
-                    catalogProperties));
-    Assertions.assertTrue(exception.getMessage().contains("Access denied for 
user"));
-  }
-
-  @Test
-  void testOperationMysqlSchema() {
+  void testOperationClickhouseSchema() {
     SupportsSchemas schemas = catalog.asSchemas();
     Namespace namespace = Namespace.of(metalakeName, catalogName);
     // list schema check.
@@ -316,9 +277,9 @@ public class CatalogMysqlIT extends BaseIT {
     Set<String> schemaNames = Sets.newHashSet(nameIdentifiers);
     Assertions.assertTrue(schemaNames.contains(schemaName));
 
-    NameIdentifier[] mysqlNamespaces = mysqlService.listSchemas(namespace);
+    NameIdentifier[] clickhouseNamespaces = 
clickhouseService.listSchemas(namespace);
     schemaNames =
-        
Arrays.stream(mysqlNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
+        
Arrays.stream(clickhouseNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
     Assertions.assertTrue(schemaNames.contains(schemaName));
 
     // create schema check.
@@ -329,14 +290,14 @@ public class CatalogMysqlIT extends BaseIT {
     schemaNames = Sets.newHashSet(nameIdentifiers);
     Assertions.assertTrue(schemaNames.contains(testSchemaName));
 
-    mysqlNamespaces = mysqlService.listSchemas(namespace);
+    clickhouseNamespaces = clickhouseService.listSchemas(namespace);
     schemaNames =
-        
Arrays.stream(mysqlNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
+        
Arrays.stream(clickhouseNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
     Assertions.assertTrue(schemaNames.contains(testSchemaName));
 
     Map<String, String> emptyMap = Collections.emptyMap();
     Assertions.assertThrows(
-        SchemaAlreadyExistsException.class,
+        RuntimeException.class,
         () -> {
           schemas.createSchema(testSchemaName, schema_comment, emptyMap);
         });
@@ -345,7 +306,7 @@ public class CatalogMysqlIT extends BaseIT {
     schemas.dropSchema(testSchemaName, false);
     Assertions.assertThrows(NoSuchSchemaException.class, () -> 
schemas.loadSchema(testSchemaName));
     Assertions.assertThrows(
-        NoSuchSchemaException.class, () -> 
mysqlService.loadSchema(schemaIdent));
+        NoSuchSchemaException.class, () -> 
clickhouseService.loadSchema(schemaIdent));
 
     nameIdentifiers = schemas.listSchemas();
     schemaNames = Sets.newHashSet(nameIdentifiers);
@@ -356,7 +317,7 @@ public class CatalogMysqlIT extends BaseIT {
     // create failed check.
     NameIdentifier table = NameIdentifier.of(testSchemaName, "test_table");
     Assertions.assertThrows(
-        NotFoundException.class,
+        NoSuchSchemaException.class,
         () ->
             tableCatalog.createTable(
                 table,
@@ -365,27 +326,25 @@ public class CatalogMysqlIT extends BaseIT {
                 createProperties(),
                 null,
                 Distributions.NONE,
-                null));
+                getSortOrders(CLICKHOUSE_COL_NAME3)));
     // drop schema failed check.
     Assertions.assertFalse(schemas.dropSchema(schemaIdent.name(), true));
     Assertions.assertFalse(schemas.dropSchema(schemaIdent.name(), false));
-    Assertions.assertFalse(tableCatalog.dropTable(table));
-    mysqlNamespaces = mysqlService.listSchemas(Namespace.empty());
+    Assertions.assertFalse(() -> tableCatalog.dropTable(table));
+    clickhouseNamespaces = clickhouseService.listSchemas(Namespace.empty());
     schemaNames =
-        
Arrays.stream(mysqlNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
+        
Arrays.stream(clickhouseNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
     Assertions.assertTrue(schemaNames.contains(schemaName));
   }
 
   @Test
-  void testCreateAndLoadMysqlTable() {
+  void testCreateAndLoadClickhouseTable() {
     // Create table from Gravitino API
     Column[] columns = createColumns();
 
     NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
     Distribution distribution = Distributions.NONE;
 
-    final SortOrder[] sortOrders = new SortOrder[0];
-
     Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
 
     Map<String, String> properties = createProperties();
@@ -397,7 +356,7 @@ public class CatalogMysqlIT extends BaseIT {
         properties,
         partitioning,
         distribution,
-        sortOrders);
+        getSortOrders(CLICKHOUSE_COL_NAME3));
 
     Table loadTable = tableCatalog.loadTable(tableIdentifier);
     Assertions.assertEquals(tableName, loadTable.name());
@@ -413,11 +372,194 @@ public class CatalogMysqlIT extends BaseIT {
     }
   }
 
+  @Test
+  void testCreateTableWithComplexOrderBy() {
+    Column[] columns =
+        new Column[] {
+          Column.of(
+              CLICKHOUSE_COL_NAME1,
+              Types.IntegerType.get(),
+              "col_1_comment",
+              false,
+              false,
+              DEFAULT_VALUE_NOT_SET),
+          Column.of(
+              CLICKHOUSE_COL_NAME2,
+              Types.DateType.get(),
+              "col_2_comment",
+              false,
+              false,
+              DEFAULT_VALUE_NOT_SET),
+          Column.of(
+              CLICKHOUSE_COL_NAME3,
+              Types.StringType.get(),
+              "col_3_comment",
+              false,
+              false,
+              DEFAULT_VALUE_NOT_SET)
+        };
+    String name = GravitinoITUtils.genRandomName("order_by_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, name);
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    SortOrder[] sortOrders =
+        new SortOrder[] {
+          SortOrders.of(NamedReference.field("clickhouse_col_name1"), 
SortDirection.ASCENDING),
+          SortOrders.of(
+              FunctionExpression.of("toDate", 
NamedReference.field("clickhouse_col_name2")),
+              SortDirection.ASCENDING)
+        };
+
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        sortOrders);
+    Table loaded = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(2, loaded.sortOrder().length);
+    Assertions.assertTrue(loaded.sortOrder()[0].expression() instanceof 
NamedReference);
+    Assertions.assertTrue(loaded.sortOrder()[1].expression() instanceof 
FuncExpressionImpl);
+  }
+
+  @Test
+  void testLoadTableFromShowCreateParsing() {
+    String name = GravitinoITUtils.genRandomName("show_create_table");
+    clickhouseService.executeQuery(
+        String.format(
+            "CREATE TABLE `%s`.`%s` (\n"
+                + "  `id` UInt64,\n"
+                + "  `event_time` DateTime,\n"
+                + "  `user_id` UInt64,\n"
+                + "  `region` String,\n"
+                + "  `amount` Float64,\n"
+                + "  PRIMARY KEY user_id,\n"
+                + "  INDEX idx_amount amount TYPE minmax GRANULARITY 1\n"
+                + ")\n"
+                + "ENGINE = MergeTree\n"
+                + "PARTITION BY toYYYYMM(event_time)\n"
+                + "ORDER BY (user_id, toYYYYMM(event_time), region)",
+            schemaName, name));
+
+    Table loaded = 
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, name));
+    SortOrder[] sortOrders = loaded.sortOrder();
+    Assertions.assertEquals(3, sortOrders.length);
+    Assertions.assertTrue(sortOrders[0].expression() instanceof 
NamedReference);
+    Assertions.assertArrayEquals(
+        new String[] {"user_id"}, ((NamedReference) 
sortOrders[0].expression()).fieldName());
+    Assertions.assertTrue(sortOrders[1].expression() instanceof 
FuncExpressionImpl);
+
+    Assertions.assertEquals(
+        FunctionExpression.of("toYYYYMM", NamedReference.field("event_time")),
+        sortOrders[1].expression());
+    Assertions.assertTrue(sortOrders[2].expression() instanceof 
NamedReference);
+    Assertions.assertArrayEquals(
+        new String[] {"region"}, ((NamedReference) 
sortOrders[2].expression()).fieldName());
+
+    Transform[] partitioning = loaded.partitioning();
+    Assertions.assertEquals(1, partitioning.length);
+    Assertions.assertEquals(Transforms.NAME_OF_MONTH, partitioning[0].name());
+    Assertions.assertArrayEquals(
+        new String[] {"event_time"}, ((NamedReference) 
partitioning[0].arguments()[0]).fieldName());
+
+    Index[] indexes = loaded.index();
+    Assertions.assertTrue(
+        Arrays.stream(indexes)
+            .anyMatch(
+                idx ->
+                    idx.type() == Index.IndexType.PRIMARY_KEY
+                        && Arrays.deepEquals(idx.fieldNames(), new String[][] 
{{"user_id"}})));
+    Assertions.assertTrue(
+        Arrays.stream(indexes)
+            .anyMatch(
+                idx ->
+                    idx.type() == Index.IndexType.DATA_SKIPPING_MINMAX
+                        && Arrays.deepEquals(idx.fieldNames(), new String[][] 
{{"amount"}})));
+  }
+
+  @Test
+  void testCreateAndLoadWithPartitionSortAndIndexes() {
+    String table = GravitinoITUtils.genRandomName("meta_roundtrip");
+    NameIdentifier ident = NameIdentifier.of(schemaName, table);
+    Column[] cols =
+        new Column[] {
+          Column.of("id", Types.LongType.get(), "id"),
+          Column.of(
+              "event_time",
+              Types.TimestampType.withoutTimeZone(),
+              "ts",
+              false,
+              false,
+              DEFAULT_VALUE_NOT_SET),
+          Column.of("user_id", Types.LongType.get(), "user", false, false, 
DEFAULT_VALUE_NOT_SET),
+          Column.of("amount", Types.FloatType.get(), "amt")
+        };
+
+    Transform[] partitioning = new Transform[] 
{Transforms.identity("event_time")};
+    SortOrder[] sortOrders =
+        new SortOrder[] {
+          SortOrders.of(NamedReference.field("user_id"), 
SortDirection.ASCENDING),
+          SortOrders.of(
+              FunctionExpression.of("toDate", 
NamedReference.field("event_time")),
+              SortDirection.ASCENDING)
+        };
+    Index[] indexes =
+        new Index[] {
+          Indexes.primary(Indexes.DEFAULT_PRIMARY_KEY_NAME, new String[][] 
{{"user_id"}}),
+          Indexes.of(
+              Index.IndexType.DATA_SKIPPING_MINMAX, "idx_amount", new 
String[][] {{"amount"}})
+        };
+
+    catalog
+        .asTableCatalog()
+        .createTable(
+            ident,
+            cols,
+            "roundtrip meta",
+            createProperties(),
+            partitioning,
+            Distributions.NONE,
+            sortOrders,
+            indexes);
+
+    Table loaded = catalog.asTableCatalog().loadTable(ident);
+    Assertions.assertEquals(1, loaded.partitioning().length);
+    Assertions.assertEquals(
+        "event_time", ((NamedReference) 
loaded.partitioning()[0].arguments()[0]).fieldName()[0]);
+
+    Assertions.assertEquals(2, loaded.sortOrder().length);
+    Assertions.assertTrue(loaded.sortOrder()[0].expression() instanceof 
NamedReference);
+    Assertions.assertEquals(
+        "user_id", ((NamedReference) 
loaded.sortOrder()[0].expression()).fieldName()[0]);
+
+    Assertions.assertTrue(loaded.sortOrder()[1].expression() instanceof 
FuncExpressionImpl);
+    Assertions.assertEquals(
+        "toDate", ((FuncExpressionImpl) 
loaded.sortOrder()[1].expression()).functionName());
+    Assertions.assertEquals(
+        1, ((FuncExpressionImpl) 
loaded.sortOrder()[1].expression()).arguments().length);
+
+    Index[] loadedIndexes = loaded.index();
+    Assertions.assertTrue(
+        Arrays.stream(loadedIndexes)
+            .anyMatch(
+                idx ->
+                    idx.type() == Index.IndexType.PRIMARY_KEY
+                        && Arrays.deepEquals(idx.fieldNames(), new String[][] 
{{"user_id"}})));
+    Assertions.assertTrue(
+        Arrays.stream(loadedIndexes)
+            .anyMatch(
+                idx ->
+                    idx.type() == Index.IndexType.DATA_SKIPPING_MINMAX
+                        && Arrays.deepEquals(idx.fieldNames(), new String[][] 
{{"amount"}})));
+  }
+
   @Test
   void testColumnNameWithKeyWords() {
     // Create table from Gravitino API
     Column[] columns = {
-      Column.of("integer", Types.IntegerType.get(), "integer"),
+      Column.of("integer", Types.IntegerType.get(), "integer", false, false, 
DEFAULT_VALUE_NOT_SET),
       Column.of("long", Types.LongType.get(), "long"),
       Column.of("float", Types.FloatType.get(), "float"),
       Column.of("double", Types.DoubleType.get(), "double"),
@@ -430,8 +572,6 @@ public class CatalogMysqlIT extends BaseIT {
     NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, name);
     Distribution distribution = Distributions.NONE;
 
-    final SortOrder[] sortOrders = new SortOrder[0];
-
     Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
 
     Map<String, String> properties = createProperties();
@@ -444,18 +584,18 @@ public class CatalogMysqlIT extends BaseIT {
             properties,
             partitioning,
             distribution,
-            sortOrders);
+            getSortOrders("integer"));
     Assertions.assertEquals(createdTable.name(), name);
   }
 
   @Test
-  // MySQL support column default value expression after 8.0.13
-  // see https://dev.mysql.com/doc/refman/8.0/en/data-type-defaults.html
-  @EnabledIf("SupportColumnDefaultValueExpression")
+  // ClickHouse support column default value expression after 8.0.13
+  // see https://dev.clickhouse.com/doc/refman/8.0/en/data-type-defaults.html
+  @EnabledIf("supportColumnDefaultValueExpression")
   void testColumnDefaultValue() {
     Column col1 =
         Column.of(
-            MYSQL_COL_NAME1,
+            CLICKHOUSE_COL_NAME1,
             Types.IntegerType.get(),
             "col_1_comment",
             false,
@@ -463,54 +603,62 @@ public class CatalogMysqlIT extends BaseIT {
             FunctionExpression.of("rand"));
     Column col2 =
         Column.of(
-            MYSQL_COL_NAME2,
+            CLICKHOUSE_COL_NAME2,
             Types.TimestampType.withoutTimeZone(),
             "col_2_comment",
             false,
             false,
-            FunctionExpression.of("current_timestamp"));
+            FunctionExpression.of("now"));
     Column col3 =
         Column.of(
-            MYSQL_COL_NAME3,
+            CLICKHOUSE_COL_NAME3,
             Types.VarCharType.of(255),
             "col_3_comment",
             true,
             false,
             Literals.NULL);
     Column col4 =
-        Column.of(MYSQL_COL_NAME4, Types.StringType.get(), "col_4_comment", 
false, false, null);
+        Column.of(
+            CLICKHOUSE_COL_NAME4, Types.StringType.get(), "col_4_comment", 
false, false, null);
     Column col5 =
         Column.of(
-            MYSQL_COL_NAME5,
+            CLICKHOUSE_COL_NAME5,
             Types.VarCharType.of(255),
             "col_5_comment",
             true,
             false,
-            Literals.stringLiteral("current_timestamp"));
+            Literals.stringLiteral("now()"));
 
     Column[] newColumns = new Column[] {col1, col2, col3, col4, col5};
 
     NameIdentifier tableIdent =
-        NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("mysql_it_table"));
-    catalog.asTableCatalog().createTable(tableIdent, newColumns, null, 
ImmutableMap.of());
+        NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("clickhouse_it_table"));
+    catalog
+        .asTableCatalog()
+        .createTable(
+            tableIdent,
+            newColumns,
+            null,
+            ImmutableMap.of(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            getSortOrders(CLICKHOUSE_COL_NAME1));
     Table createdTable = catalog.asTableCatalog().loadTable(tableIdent);
     Assertions.assertEquals(
         UnparsedExpression.of("rand()"), 
createdTable.columns()[0].defaultValue());
     Assertions.assertEquals(
-        DEFAULT_VALUE_OF_CURRENT_TIMESTAMP, 
createdTable.columns()[1].defaultValue());
+        UnparsedExpression.of("now()"), 
createdTable.columns()[1].defaultValue());
     Assertions.assertEquals(Literals.NULL, 
createdTable.columns()[2].defaultValue());
-    Assertions.assertEquals(Column.DEFAULT_VALUE_NOT_SET, 
createdTable.columns()[3].defaultValue());
+    Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, 
createdTable.columns()[3].defaultValue());
     Assertions.assertEquals(
-        Literals.varcharLiteral(255, "current_timestamp"),
-        createdTable.columns()[4].defaultValue());
+        Literals.stringLiteral("now()"), 
createdTable.columns()[4].defaultValue());
   }
 
   @Test
-  // MySQL support column default value expression after 8.0.13
-  // see https://dev.mysql.com/doc/refman/8.0/en/data-type-defaults.html
-  @EnabledIf("SupportColumnDefaultValueExpression")
+  // see https://dev.clickhouse.com/doc/refman/8.0/en/data-type-defaults.html
+  @EnabledIf("supportColumnDefaultValueExpression")
   void testColumnDefaultValueConverter() {
-    // test convert from MySQL to Gravitino
+    // test convert from ClickHouse to Gravitino
     String tableName = GravitinoITUtils.genRandomName("test_default_value");
     String fullTableName = schemaName + "." + tableName;
     String sql =
@@ -519,38 +667,37 @@ public class CatalogMysqlIT extends BaseIT {
             + " (\n"
             + "  int_col_1 int default 0x01AF,\n"
             + "  int_col_2 int default (rand()),\n"
-            + "  int_col_3 int default '3.321',\n"
+            + "  int_col_3 int default 3,\n"
             + "  unsigned_int_col_1 INT UNSIGNED default 1,\n"
-            + "  unsigned_bigint_col_1 BIGINT(20) UNSIGNED UNSIGNED default 
0,\n"
+            + "  unsigned_bigint_col_1 BIGINT(20) UNSIGNED default 0,\n"
             + "  double_col_1 double default 123.45,\n"
-            + "  varchar20_col_1 varchar(20) default (10),\n"
-            + "  varchar100_col_1 varchar(100) default 'CURRENT_TIMESTAMP',\n"
+            + "  varchar20_col_1 varchar(20) default '10',\n"
+            + "  varchar100_col_1 varchar(100) default 'now()',\n"
             + "  varchar200_col_1 varchar(200) default 'curdate()',\n"
-            + "  varchar200_col_2 varchar(200) default (curdate()),\n"
-            + "  varchar200_col_3 varchar(200) default (CURRENT_TIMESTAMP),\n"
-            + "  time_col_1 time default '00:00:00',\n"
-            + "  time_col_2 time default (now()),\n"
-            + "  datetime_col_1 datetime default CURRENT_TIMESTAMP,\n"
-            + "  datetime_col_2 datetime default current_timestamp,\n"
+            + "  varchar200_col_2 varchar(200) default (today()),\n"
+            + "  varchar200_col_3 varchar(200) default (now()),\n"
+            + "  datetime_col_1 datetime default now(),\n"
+            + "  datetime_col_2 datetime default now(),\n"
             + "  datetime_col_3 datetime default null,\n"
             + "  datetime_col_4 datetime default 19830905,\n"
-            + "  date_col_1 date default (CURRENT_DATE),\n"
+            + "  date_col_1 date default (today()),\n"
             + "  date_col_2 date,\n"
-            + "  date_col_3 date DEFAULT (CURRENT_DATE + INTERVAL 1 YEAR),\n"
-            + "  date_col_4 date DEFAULT (CURRENT_DATE),\n"
+            + "  date_col_3 date DEFAULT (today() + INTERVAL 1 YEAR),\n"
+            + "  date_col_4 date DEFAULT (today()),\n"
             + "  date_col_5 date DEFAULT '2024-04-01',\n"
             + "  timestamp_col_1 timestamp default '2012-12-31 11:30:45',\n"
             + "  timestamp_col_2 timestamp default 19830905,\n"
-            + "  timestamp_col_3 timestamp(6) default CURRENT_TIMESTAMP(6),\n"
+            + "  timestamp_col_3 timestamp(6) default now(),\n"
             + "  decimal_6_2_col_1 decimal(6, 2) default 1.2,\n"
-            + "  bit_col_1 bit default b'1'\n"
-            + ");\n";
+            + "  bit_col_1 bit default '1'\n"
+            + ") order by int_col_1;\n";
 
-    mysqlService.executeQuery(sql);
+    clickhouseService.executeQuery(sql);
     Table loadedTable =
         catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
tableName));
 
     for (Column column : loadedTable.columns()) {
+      //      try {
       switch (column.name()) {
         case "int_col_1":
           Assertions.assertEquals(Literals.integerLiteral(431), 
column.defaultValue());
@@ -572,72 +719,64 @@ public class CatalogMysqlIT extends BaseIT {
           Assertions.assertEquals(Literals.doubleLiteral(123.45), 
column.defaultValue());
           break;
         case "varchar20_col_1":
-          Assertions.assertEquals(UnparsedExpression.of("10"), 
column.defaultValue());
+          Assertions.assertEquals(Literals.stringLiteral("10"), 
column.defaultValue());
           break;
         case "varchar100_col_1":
-          Assertions.assertEquals(
-              Literals.varcharLiteral(100, "CURRENT_TIMESTAMP"), 
column.defaultValue());
+          Assertions.assertEquals(Literals.stringLiteral("now()"), 
column.defaultValue());
           break;
         case "varchar200_col_1":
-          Assertions.assertEquals(Literals.varcharLiteral(200, "curdate()"), 
column.defaultValue());
+          Assertions.assertEquals(Literals.stringLiteral("curdate()"), 
column.defaultValue());
           break;
         case "varchar200_col_2":
-          Assertions.assertEquals(UnparsedExpression.of("curdate()"), 
column.defaultValue());
+          Assertions.assertEquals(Literals.stringLiteral("today()"), 
column.defaultValue());
           break;
         case "varchar200_col_3":
-          Assertions.assertEquals(UnparsedExpression.of("now()"), 
column.defaultValue());
-          break;
-        case "time_col_1":
-          Assertions.assertEquals(Literals.timeLiteral("00:00:00"), 
column.defaultValue());
-          break;
-        case "time_col_2":
-          Assertions.assertEquals(UnparsedExpression.of("now()"), 
column.defaultValue());
+          Assertions.assertEquals(Literals.stringLiteral("now()"), 
column.defaultValue());
           break;
         case "datetime_col_1":
         case "datetime_col_2":
-          Assertions.assertEquals(DEFAULT_VALUE_OF_CURRENT_TIMESTAMP, 
column.defaultValue());
+          Assertions.assertEquals(UnparsedExpression.of("now()"), 
column.defaultValue());
           break;
         case "datetime_col_3":
           Assertions.assertEquals(Literals.NULL, column.defaultValue());
           break;
         case "datetime_col_4":
-          Assertions.assertEquals(
-              Literals.timestampLiteral("1983-09-05T00:00"), 
column.defaultValue());
+          Assertions.assertEquals(UnparsedExpression.of("19830905"), 
column.defaultValue());
           break;
         case "date_col_1":
-          Assertions.assertEquals(UnparsedExpression.of("curdate()"), 
column.defaultValue());
+          Assertions.assertEquals(UnparsedExpression.of("today()"), 
column.defaultValue());
           break;
         case "date_col_2":
-          Assertions.assertEquals(Literals.NULL, column.defaultValue());
+          Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, 
column.defaultValue());
           break;
         case "date_col_3":
           Assertions.assertEquals(
-              UnparsedExpression.of("(curdate() + interval 1 year)"), 
column.defaultValue());
+              UnparsedExpression.of("today() + toIntervalYear(1)"), 
column.defaultValue());
           break;
         case "date_col_4":
-          Assertions.assertEquals(UnparsedExpression.of("curdate()"), 
column.defaultValue());
+          Assertions.assertEquals(UnparsedExpression.of("today()"), 
column.defaultValue());
           break;
         case "date_col_5":
           Assertions.assertEquals(
-              Literals.of("2024-04-01", Types.DateType.get()), 
column.defaultValue());
+              Literals.dateLiteral(LocalDate.of(2024, 4, 1)), 
column.defaultValue());
           break;
         case "timestamp_col_1":
           Assertions.assertEquals(
               Literals.timestampLiteral("2012-12-31T11:30:45"), 
column.defaultValue());
           break;
         case "timestamp_col_2":
-          Assertions.assertEquals(
-              Literals.timestampLiteral("1983-09-05T00:00:00"), 
column.defaultValue());
+          Assertions.assertEquals(UnparsedExpression.of("19830905"), 
column.defaultValue());
           break;
         case "timestamp_col_3":
-          Assertions.assertEquals(DEFAULT_VALUE_OF_CURRENT_TIMESTAMP, 
column.defaultValue());
+          Assertions.assertEquals(UnparsedExpression.of("now()"), 
column.defaultValue());
           break;
         case "decimal_6_2_col_1":
           Assertions.assertEquals(
               Literals.decimalLiteral(Decimal.of("1.2", 6, 2)), 
column.defaultValue());
           break;
         case "bit_col_1":
-          Assertions.assertEquals(UnparsedExpression.of("b'1'"), 
column.defaultValue());
+          Assertions.assertEquals(
+              Literals.unsignedLongLiteral(Decimal.of("1")), 
column.defaultValue());
           break;
         default:
           Assertions.fail(
@@ -651,7 +790,7 @@ public class CatalogMysqlIT extends BaseIT {
 
   @Test
   void testColumnTypeConverter() {
-    // test convert from MySQL to Gravitino
+    // test convert from ClickHouse to Gravitino
     String tableName = GravitinoITUtils.genRandomName("test_type_converter");
     String fullTableName = schemaName + "." + tableName;
     String sql =
@@ -666,30 +805,15 @@ public class CatalogMysqlIT extends BaseIT {
             + "  double_col double,\n"
             + "  date_col date,\n"
             + "  time_col time,\n"
-            + "  time_col_0 time(0),\n"
-            + "  time_col_1 time(1),\n"
-            + "  time_col_3 time(3),\n"
-            + "  time_col_6 time(6),\n"
             + "  timestamp_col timestamp,\n"
-            + "  timestamp_col_0 timestamp(0) default current_timestamp,\n"
-            + "  timestamp_col_1 timestamp(1) default current_timestamp(1),\n"
-            + "  timestamp_col_3 timestamp(3) default '2012-12-31 
11:30:45.123',\n"
-            + "  timestamp_col_6 timestamp(6) default '2012-12-31 
11:30:45.123456',\n"
             + "  datetime_col datetime,\n"
-            + "  datetime_col_0 datetime(0),\n"
-            + "  datetime_col_1 datetime(1),\n"
-            + "  datetime_col_3 datetime(3),\n"
-            + "  datetime_col_6 datetime(6),\n"
             + "  decimal_6_2_col decimal(6, 2),\n"
             + "  varchar20_col varchar(20),\n"
             + "  text_col text,\n"
-            + "  binary_col binary,\n"
-            + "  blob_col blob,\n"
-            + "  bit_col_8 bit(8),\n"
-            + "  bit_col bit\n"
-            + ");\n";
+            + "  blob_col blob\n"
+            + ") order by tinyint_col;\n";
 
-    mysqlService.executeQuery(sql);
+    clickhouseService.executeQuery(sql);
     Table loadedTable =
         catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
tableName));
 
@@ -717,49 +841,19 @@ public class CatalogMysqlIT extends BaseIT {
           Assertions.assertEquals(Types.DateType.get(), column.dataType());
           break;
         case "time_col":
-        case "time_col_0":
-          Assertions.assertEquals(Types.TimeType.of(0), column.dataType());
-          break;
-        case "time_col_1":
-          Assertions.assertEquals(Types.TimeType.of(1), column.dataType());
-          break;
-        case "time_col_3":
-          Assertions.assertEquals(Types.TimeType.of(3), column.dataType());
-          break;
-        case "time_col_6":
-          Assertions.assertEquals(Types.TimeType.of(6), column.dataType());
+          Assertions.assertEquals(Types.LongType.get(), column.dataType());
           break;
         case "timestamp_col":
-        case "timestamp_col_0":
-          Assertions.assertEquals(Types.TimestampType.withTimeZone(0), 
column.dataType());
-          break;
-        case "timestamp_col_1":
-          Assertions.assertEquals(Types.TimestampType.withTimeZone(1), 
column.dataType());
-          break;
-        case "timestamp_col_3":
-          Assertions.assertEquals(Types.TimestampType.withTimeZone(3), 
column.dataType());
-          break;
-        case "timestamp_col_6":
-          Assertions.assertEquals(Types.TimestampType.withTimeZone(6), 
column.dataType());
+          Assertions.assertEquals(Types.TimestampType.withoutTimeZone(0), 
column.dataType());
           break;
         case "datetime_col":
-        case "datetime_col_0":
           Assertions.assertEquals(Types.TimestampType.withoutTimeZone(0), 
column.dataType());
           break;
-        case "datetime_col_1":
-          Assertions.assertEquals(Types.TimestampType.withoutTimeZone(1), 
column.dataType());
-          break;
-        case "datetime_col_3":
-          Assertions.assertEquals(Types.TimestampType.withoutTimeZone(3), 
column.dataType());
-          break;
-        case "datetime_col_6":
-          Assertions.assertEquals(Types.TimestampType.withoutTimeZone(6), 
column.dataType());
-          break;
         case "decimal_6_2_col":
           Assertions.assertEquals(Types.DecimalType.of(6, 2), 
column.dataType());
           break;
         case "varchar20_col":
-          Assertions.assertEquals(Types.VarCharType.of(20), column.dataType());
+          Assertions.assertEquals(Types.StringType.get(), column.dataType());
           break;
         case "text_col":
           Assertions.assertEquals(Types.StringType.get(), column.dataType());
@@ -767,14 +861,8 @@ public class CatalogMysqlIT extends BaseIT {
         case "binary_col":
           Assertions.assertEquals(Types.BinaryType.get(), column.dataType());
           break;
-        case "bit_col_8":
-          Assertions.assertEquals(Types.BinaryType.get(), column.dataType());
-          break;
-        case "bit_col":
-          Assertions.assertEquals(Types.BooleanType.get(), column.dataType());
-          break;
         case "blob_col":
-          Assertions.assertEquals(Types.ExternalType.of("BLOB"), 
column.dataType());
+          Assertions.assertEquals(Types.StringType.get(), column.dataType());
           break;
         default:
           Assertions.fail("Unexpected column name: " + column.name());
@@ -783,12 +871,18 @@ public class CatalogMysqlIT extends BaseIT {
   }
 
   @Test
-  void testAlterAndDropMysqlTable() {
+  void testAlterAndDropClickhouseTable() {
     Column[] columns = createColumns();
     catalog
         .asTableCatalog()
         .createTable(
-            NameIdentifier.of(schemaName, tableName), columns, table_comment, 
createProperties());
+            NameIdentifier.of(schemaName, tableName),
+            columns,
+            table_comment,
+            createProperties(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            getSortOrders(CLICKHOUSE_COL_NAME3));
     Assertions.assertThrows(
         IllegalArgumentException.class,
         () -> {
@@ -808,22 +902,38 @@ public class CatalogMysqlIT extends BaseIT {
         .asTableCatalog()
         .alterTable(
             NameIdentifier.of(schemaName, alertTableName),
-            TableChange.updateComment(table_comment + "_new"),
-            TableChange.addColumn(new String[] {"col_4"}, 
Types.StringType.get()),
-            TableChange.renameColumn(new String[] {MYSQL_COL_NAME2}, 
"col_2_new"),
-            TableChange.updateColumnType(new String[] {MYSQL_COL_NAME1}, 
Types.IntegerType.get()));
+            TableChange.updateComment(table_comment + "_new"));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, alertTableName),
+            TableChange.addColumn(new String[] {"col_4"}, 
Types.StringType.get()));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, alertTableName),
+            TableChange.renameColumn(new String[] {CLICKHOUSE_COL_NAME2}, 
"col_2_new"));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, alertTableName),
+            TableChange.updateColumnType(
+                new String[] {CLICKHOUSE_COL_NAME1}, Types.IntegerType.get()));
 
     Table table = 
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
alertTableName));
     Assertions.assertEquals(alertTableName, table.name());
 
-    Assertions.assertEquals(MYSQL_COL_NAME1, table.columns()[0].name());
+    Assertions.assertEquals(CLICKHOUSE_COL_NAME1, table.columns()[0].name());
     Assertions.assertEquals(Types.IntegerType.get(), 
table.columns()[0].dataType());
 
     Assertions.assertEquals("col_2_new", table.columns()[1].name());
     Assertions.assertEquals(Types.DateType.get(), 
table.columns()[1].dataType());
     Assertions.assertEquals("col_2_comment", table.columns()[1].comment());
 
-    Assertions.assertEquals(MYSQL_COL_NAME3, table.columns()[2].name());
+    Assertions.assertEquals(CLICKHOUSE_COL_NAME3, table.columns()[2].name());
     Assertions.assertEquals(Types.StringType.get(), 
table.columns()[2].dataType());
     Assertions.assertEquals("col_3_comment", table.columns()[2].comment());
 
@@ -836,7 +946,8 @@ public class CatalogMysqlIT extends BaseIT {
     Assertions.assertNotNull(table.auditInfo().lastModifiedTime());
     Assertions.assertNotNull(table.auditInfo().lastModifier());
 
-    Column col1 = Column.of("name", Types.StringType.get(), "comment");
+    Column col1 =
+        Column.of("name", Types.StringType.get(), "comment", false, false, 
DEFAULT_VALUE_NOT_SET);
     Column col2 = Column.of("address", Types.StringType.get(), "comment");
     Column col3 = Column.of("date_of_birth", Types.DateType.get(), "comment");
 
@@ -852,7 +963,7 @@ public class CatalogMysqlIT extends BaseIT {
             ImmutableMap.of(),
             Transforms.EMPTY_TRANSFORM,
             Distributions.NONE,
-            new SortOrder[0]);
+            getSortOrders("name"));
 
     TableCatalog tableCatalog = catalog.asTableCatalog();
     TableChange change =
@@ -896,6 +1007,63 @@ public class CatalogMysqlIT extends BaseIT {
         });
   }
 
+  @Test
+  void testRecreateSchemaAndTable() {
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    Column[] columns = createColumns();
+    String recreateTableName = 
GravitinoITUtils.genRandomName("recreate_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
recreateTableName);
+
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        createProperties(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders(CLICKHOUSE_COL_NAME3));
+
+    String updatedComment = table_comment + "_updated";
+    tableCatalog.alterTable(tableIdentifier, 
TableChange.updateComment(updatedComment));
+
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(recreateTableName, loadedTable.name());
+    Assertions.assertEquals(updatedComment, loadedTable.comment());
+
+    Assertions.assertTrue(tableCatalog.dropTable(tableIdentifier));
+    Assertions.assertFalse(tableCatalog.tableExists(tableIdentifier));
+
+    String recreatedComment = table_comment + "_recreated";
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        recreatedComment,
+        createProperties(),
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders(CLICKHOUSE_COL_NAME3));
+
+    Table recreatedTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(recreateTableName, recreatedTable.name());
+    Assertions.assertEquals(recreatedComment, recreatedTable.comment());
+    Assertions.assertEquals(columns.length, recreatedTable.columns().length);
+
+    SupportsSchemas schemaSupport = catalog.asSchemas();
+    String recreateSchemaName = 
GravitinoITUtils.genRandomName("recreate_schema");
+    Map<String, String> schemaProperties = ImmutableMap.of();
+
+    schemaSupport.createSchema(recreateSchemaName, schema_comment, 
schemaProperties);
+
+    Assertions.assertTrue(schemaSupport.dropSchema(recreateSchemaName, true));
+    Assertions.assertFalse(schemaSupport.schemaExists(recreateSchemaName));
+
+    schemaSupport.createSchema(recreateSchemaName, schema_comment, 
schemaProperties);
+    schemaSupport.loadSchema(recreateSchemaName);
+
+    schemaSupport.dropSchema(recreateSchemaName, true);
+    tableCatalog.dropTable(tableIdentifier);
+  }
+
   @Test
   void testUpdateColumnDefaultValue() {
     Column[] columns = createColumnsWithDefaultValue();
@@ -903,10 +1071,90 @@ public class CatalogMysqlIT extends BaseIT {
         catalog
             .asTableCatalog()
             .createTable(
-                NameIdentifier.of(schemaName, tableName), columns, null, 
ImmutableMap.of());
+                NameIdentifier.of(schemaName, tableName),
+                columns,
+                null,
+                ImmutableMap.of(),
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                getSortOrders(CLICKHOUSE_COL_NAME1));
 
     Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, 
table.auditInfo().creator());
     Assertions.assertNull(table.auditInfo().lastModifier());
+
+    // Change default value of float
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[0].name()}, Literals.of("1.2345", 
Types.FloatType.get())));
+
+    // Change default value of timestamp to function expression
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[1].name()}, 
FunctionExpression.of("now")));
+    // Change default value of varchar
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[2].name()}, Literals.of("hello", 
Types.VarCharType.of(255))));
+
+    // Change default value of int
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[3].name()}, Literals.of("2000", 
Types.IntegerType.get())));
+    // Change default value of decimal
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[4].name()}, Literals.of("2.34", 
Types.DecimalType.of(3, 2))));
+
+    // Change default value of all columns at once
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[0].name()}, Literals.of("1.2345", 
Types.FloatType.get())),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[1].name()}, 
FunctionExpression.of("now")),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[2].name()}, Literals.of("hello", 
Types.VarCharType.of(255))),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[3].name()}, Literals.of("2000", 
Types.IntegerType.get())),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[4].name()}, Literals.of("2.34", 
Types.DecimalType.of(3, 2))));
+
+    // Change default value of all columns at once again to make sure multiple 
alter table with
+    // updateColumnDefaultValue works well together
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[0].name()}, Literals.of("1.2345", 
Types.FloatType.get())),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[1].name()}, 
FunctionExpression.of("now")),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[2].name()}, Literals.of("hello", 
Types.VarCharType.of(255))),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[3].name()}, Literals.of("2000", 
Types.IntegerType.get())),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[4].name()}, Literals.of("2.34", 
Types.DecimalType.of(3, 2))));
+
+    // Change default value of all columns at once again with different values 
to make sure multiple
+    // alter table with updateColumnDefaultValue works well together
     catalog
         .asTableCatalog()
         .alterTable(
@@ -914,7 +1162,7 @@ public class CatalogMysqlIT extends BaseIT {
             TableChange.updateColumnDefaultValue(
                 new String[] {columns[0].name()}, Literals.of("1.2345", 
Types.FloatType.get())),
             TableChange.updateColumnDefaultValue(
-                new String[] {columns[1].name()}, 
DEFAULT_VALUE_OF_CURRENT_TIMESTAMP),
+                new String[] {columns[1].name()}, 
FunctionExpression.of("now")),
             TableChange.updateColumnDefaultValue(
                 new String[] {columns[2].name()}, Literals.of("hello", 
Types.VarCharType.of(255))),
             TableChange.updateColumnDefaultValue(
@@ -926,10 +1174,9 @@ public class CatalogMysqlIT extends BaseIT {
 
     Assertions.assertEquals(
         Literals.of("1.2345", Types.FloatType.get()), 
table.columns()[0].defaultValue());
+    Assertions.assertEquals(UnparsedExpression.of("now()"), 
table.columns()[1].defaultValue());
     Assertions.assertEquals(
-        FunctionExpression.of("current_timestamp"), 
table.columns()[1].defaultValue());
-    Assertions.assertEquals(
-        Literals.of("hello", Types.VarCharType.of(255)), 
table.columns()[2].defaultValue());
+        Literals.of("hello", Types.StringType.get()), 
table.columns()[2].defaultValue());
     Assertions.assertEquals(
         Literals.of("2000", Types.IntegerType.get()), 
table.columns()[3].defaultValue());
     Assertions.assertEquals(
@@ -937,9 +1184,9 @@ public class CatalogMysqlIT extends BaseIT {
   }
 
   @Test
-  void testDropMySQLDatabase() {
-    String schemaName = 
GravitinoITUtils.genRandomName("mysql_schema").toLowerCase();
-    String tableName = 
GravitinoITUtils.genRandomName("mysql_table").toLowerCase();
+  void testDropClickHouseDatabase() {
+    String schemaName = 
GravitinoITUtils.genRandomName("clickhouse_schema").toLowerCase();
+    String tableName = 
GravitinoITUtils.genRandomName("clickhouse_table").toLowerCase();
 
     catalog
         .asSchemas()
@@ -951,7 +1198,8 @@ public class CatalogMysqlIT extends BaseIT {
             NameIdentifier.of(schemaName, tableName),
             createColumns(),
             "Created by Gravitino client",
-            ImmutableMap.<String, String>builder().build());
+            ImmutableMap.<String, String>builder().build(),
+            getSortOrders(CLICKHOUSE_COL_NAME3));
 
     // Try to drop a database, and cascade equals to false, it should not be
     // allowed.
@@ -972,423 +1220,18 @@ public class CatalogMysqlIT extends BaseIT {
         () -> {
           schemas.loadSchema(schemaName);
         });
-
-    // test drop inconsistent database
-    catalog
-        .asSchemas()
-        .createSchema(schemaName, null, ImmutableMap.<String, 
String>builder().build());
-    catalog
-        .asTableCatalog()
-        .createTable(
-            NameIdentifier.of(schemaName, tableName),
-            createColumns(),
-            "Created by Gravitino client",
-            ImmutableMap.<String, String>builder().build());
-    // drop the table externally
-    mysqlService.executeQuery("DROP TABLE " + schemaName + "." + tableName);
-
-    // drop the schema without cascade
-    Assertions.assertTrue(catalog.asSchemas().dropSchema(schemaName, false));
-  }
-
-  @Test
-  void testCreateTableIndex() {
-    Column col1 = Column.of("col_1", Types.LongType.get(), "id", false, false, 
null);
-    Column col2 = Column.of("col_2", Types.ByteType.get(), "yes", false, 
false, null);
-    Column col3 = Column.of("col_3", Types.DateType.get(), "comment", false, 
false, null);
-    Column col4 = Column.of("col_4", Types.VarCharType.of(255), "code", false, 
false, null);
-    Column col5 = Column.of("col_5", Types.VarCharType.of(255), "config", 
false, false, null);
-    Column[] newColumns = new Column[] {col1, col2, col3, col4, col5};
-
-    Index[] indexes =
-        new Index[] {
-          Indexes.createMysqlPrimaryKey(new String[][] {{"col_1"}, {"col_2"}}),
-          Indexes.unique("u1_key", new String[][] {{"col_2"}, {"col_3"}}),
-          Indexes.unique("u2_key", new String[][] {{"col_3"}, {"col_4"}}),
-          Indexes.unique("u3_key", new String[][] {{"col_5"}, {"col_4"}}),
-          Indexes.unique("u4_key", new String[][] {{"col_2"}, {"col_3"}, 
{"col_4"}}),
-          Indexes.unique("u5_key", new String[][] {{"col_3"}, {"col_2"}, 
{"col_4"}}),
-          Indexes.unique("u6_key", new String[][] {{"col_3"}, {"col_4"}, 
{"col_1"}, {"col_2"}}),
-        };
-
-    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
-
-    Map<String, String> properties = createProperties();
-    TableCatalog tableCatalog = catalog.asTableCatalog();
-    Table createdTable =
-        tableCatalog.createTable(
-            tableIdentifier,
-            newColumns,
-            table_comment,
-            properties,
-            Transforms.EMPTY_TRANSFORM,
-            Distributions.NONE,
-            new SortOrder[0],
-            indexes);
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(newColumns),
-        properties,
-        indexes,
-        Transforms.EMPTY_TRANSFORM,
-        createdTable);
-    Table table = tableCatalog.loadTable(tableIdentifier);
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(newColumns),
-        properties,
-        indexes,
-        Transforms.EMPTY_TRANSFORM,
-        table);
-
-    NameIdentifier id = NameIdentifier.of(schemaName, "test_failed");
-    Index[] indexes2 =
-        new Index[] {Indexes.createMysqlPrimaryKey(new String[][] {{"col_1", 
"col_2"}})};
-    SortOrder[] sortOrder = new SortOrder[0];
-    IllegalArgumentException illegalArgumentException =
-        assertThrows(
-            IllegalArgumentException.class,
-            () -> {
-              tableCatalog.createTable(
-                  id,
-                  newColumns,
-                  table_comment,
-                  properties,
-                  Transforms.EMPTY_TRANSFORM,
-                  Distributions.NONE,
-                  sortOrder,
-                  indexes2);
-            });
-    Assertions.assertTrue(
-        StringUtils.contains(
-            illegalArgumentException.getMessage(),
-            "Index does not support complex fields in this Catalog"));
-
-    Index[] indexes3 = new Index[] {Indexes.unique("u1_key", new String[][] 
{{"col_2", "col_3"}})};
-    illegalArgumentException =
-        assertThrows(
-            IllegalArgumentException.class,
-            () -> {
-              tableCatalog.createTable(
-                  id,
-                  newColumns,
-                  table_comment,
-                  properties,
-                  Transforms.EMPTY_TRANSFORM,
-                  Distributions.NONE,
-                  sortOrder,
-                  indexes3);
-            });
-    Assertions.assertTrue(
-        StringUtils.contains(
-            illegalArgumentException.getMessage(),
-            "Index does not support complex fields in this Catalog"));
-
-    NameIdentifier tableIdent = NameIdentifier.of(schemaName, "test_null_key");
-    tableCatalog.createTable(
-        tableIdent,
-        newColumns,
-        table_comment,
-        properties,
-        Transforms.EMPTY_TRANSFORM,
-        Distributions.NONE,
-        new SortOrder[0],
-        new Index[] {
-          Indexes.of(
-              Index.IndexType.UNIQUE_KEY, null, new String[][] {{"col_1"}, 
{"col_3"}, {"col_4"}}),
-          Indexes.of(Index.IndexType.UNIQUE_KEY, null, new String[][] 
{{"col_4"}}),
-        });
-    table = tableCatalog.loadTable(tableIdent);
-
-    Assertions.assertEquals(2, table.index().length);
-    Assertions.assertNotNull(table.index()[0].name());
-    Assertions.assertNotNull(table.index()[1].name());
-
-    NameIdentifier nullableTableIdent = NameIdentifier.of(schemaName, 
"test_nullable");
-    Column nullWithAutoIncrementCol =
-        Column.of("col_6", Types.LongType.get(), "id", true, true, null);
-
-    Exception uniqueKeyNotExistException =
-        assertThrows(
-            IllegalArgumentException.class,
-            () ->
-                tableCatalog.createTable(
-                    nullableTableIdent,
-                    new Column[] {nullWithAutoIncrementCol},
-                    table_comment,
-                    properties,
-                    Transforms.EMPTY_TRANSFORM,
-                    Distributions.NONE,
-                    new SortOrder[0],
-                    new Index[] {
-                      Indexes.of(
-                          Index.IndexType.UNIQUE_KEY,
-                          "u_key",
-                          new String[][] {{"col_7"}, {"col_6"}}),
-                    }));
-    Assertions.assertTrue(
-        uniqueKeyNotExistException
-            .getMessage()
-            .contains("Column col_7 in the unique index u_key does not exist 
in the table"));
-
-    Exception uniqueKeyNotNullException =
-        assertThrows(
-            IllegalArgumentException.class,
-            () ->
-                tableCatalog.createTable(
-                    nullableTableIdent,
-                    new Column[] {nullWithAutoIncrementCol},
-                    table_comment,
-                    properties,
-                    Transforms.EMPTY_TRANSFORM,
-                    Distributions.NONE,
-                    new SortOrder[0],
-                    new Index[] {
-                      Indexes.of(Index.IndexType.UNIQUE_KEY, "u_key", new 
String[][] {{"col_6"}}),
-                    }));
-    Assertions.assertTrue(
-        uniqueKeyNotNullException
-            .getMessage()
-            .contains(
-                "Auto increment column col_6 in the unique index u_key must be 
a not null column"));
-
-    Column nullWithoutAutoIncrementCol =
-        Column.of("col_7", Types.LongType.get(), "id", true, false, null);
-    tableCatalog.createTable(
-        nullableTableIdent,
-        new Column[] {nullWithoutAutoIncrementCol},
-        table_comment,
-        properties,
-        Transforms.EMPTY_TRANSFORM,
-        Distributions.NONE,
-        new SortOrder[0],
-        new Index[] {
-          Indexes.of(Index.IndexType.UNIQUE_KEY, "u_key", new String[][] 
{{"col_7"}}),
-        });
-    table = tableCatalog.loadTable(nullableTableIdent);
-
-    Assertions.assertEquals(1, table.index().length);
-    Assertions.assertNotNull(table.index()[0].name());
-
-    Exception primaryKeyNotExistexception =
-        assertThrows(
-            IllegalArgumentException.class,
-            () ->
-                tableCatalog.createTable(
-                    nullableTableIdent,
-                    new Column[] {nullWithoutAutoIncrementCol},
-                    table_comment,
-                    properties,
-                    Transforms.EMPTY_TRANSFORM,
-                    Distributions.NONE,
-                    new SortOrder[0],
-                    new Index[] {
-                      Indexes.createMysqlPrimaryKey(new String[][] 
{{"col_8"}}),
-                    }));
-    Assertions.assertTrue(
-        primaryKeyNotExistexception
-            .getMessage()
-            .contains("Column col_8 in the primary key does not exist in the 
table"));
-
-    Exception primaryKeyNotNullException =
-        assertThrows(
-            IllegalArgumentException.class,
-            () ->
-                tableCatalog.createTable(
-                    nullableTableIdent,
-                    new Column[] {nullWithoutAutoIncrementCol},
-                    table_comment,
-                    properties,
-                    Transforms.EMPTY_TRANSFORM,
-                    Distributions.NONE,
-                    new SortOrder[0],
-                    new Index[] {
-                      Indexes.createMysqlPrimaryKey(new String[][] 
{{"col_7"}}),
-                    }));
-    Assertions.assertTrue(
-        primaryKeyNotNullException
-            .getMessage()
-            .contains("Column col_7 in the primary key must be a not null 
column"));
-  }
-
-  @Test
-  public void testAutoIncrement() {
-    Column col1 = Column.of("col_1", Types.LongType.get(), "id", false, true, 
null);
-    Column col2 = Column.of("col_2", Types.ByteType.get(), "yes", false, 
false, null);
-    Column col3 = Column.of("col_3", Types.DateType.get(), "comment", false, 
false, null);
-    Column col4 = Column.of("col_4", Types.VarCharType.of(255), "code", false, 
false, null);
-    Column col5 = Column.of("col_5", Types.VarCharType.of(255), "config", 
false, false, null);
-    Column[] newColumns = new Column[] {col1, col2, col3, col4, col5};
-
-    Index[] indexes =
-        new Index[] {
-          Indexes.createMysqlPrimaryKey(new String[][] {{"col_1"}, {"col_2"}}),
-          Indexes.unique("u1_key", new String[][] {{"col_2"}, {"col_3"}})
-        };
-
-    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
-
-    Map<String, String> properties = createProperties();
-    TableCatalog tableCatalog = catalog.asTableCatalog();
-    Table createdTable =
-        tableCatalog.createTable(
-            tableIdentifier,
-            newColumns,
-            table_comment,
-            properties,
-            Transforms.EMPTY_TRANSFORM,
-            Distributions.NONE,
-            new SortOrder[0],
-            indexes);
-    // Test create auto increment key success.
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(newColumns),
-        properties,
-        indexes,
-        Transforms.EMPTY_TRANSFORM,
-        createdTable);
-    Table table = tableCatalog.loadTable(tableIdentifier);
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(newColumns),
-        properties,
-        indexes,
-        Transforms.EMPTY_TRANSFORM,
-        table);
-
-    // Test alter table. auto increment exist.
-    // UpdateColumnType
-    tableCatalog.alterTable(
-        tableIdentifier,
-        TableChange.updateColumnType(new String[] {"col_1"}, 
Types.IntegerType.get()));
-    table = tableCatalog.loadTable(tableIdentifier);
-    Column[] alterColumns =
-        new Column[] {
-          Column.of("col_1", Types.IntegerType.get(), "id", false, true, null),
-          col2,
-          col3,
-          col4,
-          col5
-        };
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(alterColumns),
-        properties,
-        indexes,
-        Transforms.EMPTY_TRANSFORM,
-        table);
-
-    // UpdateColumnComment
-    tableCatalog.alterTable(
-        tableIdentifier, TableChange.updateColumnComment(new String[] 
{"col_1"}, "new_id_comment"));
-    table = tableCatalog.loadTable(tableIdentifier);
-    alterColumns =
-        new Column[] {
-          Column.of("col_1", Types.IntegerType.get(), "new_id_comment", false, 
true, null),
-          col2,
-          col3,
-          col4,
-          col5
-        };
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(alterColumns),
-        properties,
-        indexes,
-        Transforms.EMPTY_TRANSFORM,
-        table);
-
-    // RenameColumn
-    tableCatalog.alterTable(
-        tableIdentifier, TableChange.renameColumn(new String[] {"col_1"}, 
"col_1_1"));
-    table = tableCatalog.loadTable(tableIdentifier);
-    alterColumns =
-        new Column[] {
-          Column.of("col_1_1", Types.IntegerType.get(), "new_id_comment", 
false, true, null),
-          col2,
-          col3,
-          col4,
-          col5
-        };
-    indexes =
-        new Index[] {
-          Indexes.createMysqlPrimaryKey(new String[][] {{"col_1_1"}, 
{"col_2"}}),
-          Indexes.unique("u1_key", new String[][] {{"col_2"}, {"col_3"}})
-        };
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(alterColumns),
-        properties,
-        indexes,
-        Transforms.EMPTY_TRANSFORM,
-        table);
-
-    tableCatalog.dropTable(tableIdentifier);
-
-    // Test create auto increment fail(No index)
-    RuntimeException runtimeException =
-        assertThrows(
-            RuntimeException.class,
-            () ->
-                tableCatalog.createTable(
-                    tableIdentifier,
-                    newColumns,
-                    table_comment,
-                    properties,
-                    Transforms.EMPTY_TRANSFORM,
-                    Distributions.NONE,
-                    new SortOrder[0],
-                    Indexes.EMPTY_INDEXES));
-    Assertions.assertTrue(
-        StringUtils.contains(
-            runtimeException.getMessage(),
-            "Incorrect table definition; there can be only one auto column and 
it must be defined as a key"));
-
-    // Test create auto increment fail(Many index col)
-    ColumnImpl column = Column.of("col_6", Types.LongType.get(), "id2", false, 
true, null);
-    SortOrder[] sortOrder = new SortOrder[0];
-    Index[] index2 =
-        new Index[] {Indexes.createMysqlPrimaryKey(new String[][] {{"col_1"}, 
{"col_6"}})};
-
-    runtimeException =
-        assertThrows(
-            RuntimeException.class,
-            () ->
-                tableCatalog.createTable(
-                    tableIdentifier,
-                    new Column[] {col1, col2, col3, col4, col5, column},
-                    table_comment,
-                    properties,
-                    Transforms.EMPTY_TRANSFORM,
-                    Distributions.NONE,
-                    sortOrder,
-                    index2));
-    Assertions.assertTrue(
-        StringUtils.contains(
-            runtimeException.getMessage(),
-            "Only one column can be auto-incremented. There are multiple 
auto-increment columns in your table: [col_1,col_6]"));
   }
 
   @Test
   public void testSchemaComment() {
-    final String testSchemaName = "test";
-    RuntimeException exception =
-        Assertions.assertThrowsExactly(
-            UnsupportedOperationException.class,
-            () -> catalog.asSchemas().createSchema(testSchemaName, "comment", 
null));
-    Assertions.assertTrue(
-        exception.getMessage().contains("Doesn't support setting schema 
comment: comment"));
+    final String testSchemaName = GravitinoITUtils.genRandomName("test");
+    Assertions.assertDoesNotThrow(
+        () -> {
+          catalog.asSchemas().createSchema(testSchemaName, "comment", null);
+        });
 
     // test null comment
-    String testSchemaName2 = "test2";
+    String testSchemaName2 = GravitinoITUtils.genRandomName("test");
     Schema schema = catalog.asSchemas().createSchema(testSchemaName2, "", 
null);
     Assertions.assertTrue(StringUtils.isEmpty(schema.comment()));
     schema = catalog.asSchemas().loadSchema(testSchemaName2);
@@ -1414,7 +1257,7 @@ public class CatalogMysqlIT extends BaseIT {
                 Collections.emptyMap(),
                 Transforms.EMPTY_TRANSFORM,
                 Distributions.NONE,
-                new SortOrder[0],
+                getSortOrders("create"),
                 Indexes.EMPTY_INDEXES));
 
     Assertions.assertDoesNotThrow(() -> 
tableCatalog.loadTable(tableIdentifier));
@@ -1423,20 +1266,26 @@ public class CatalogMysqlIT extends BaseIT {
         () ->
             tableCatalog.alterTable(
                 tableIdentifier,
-                new TableChange[] {
-                  TableChange.addColumn(
-                      new String[] {"int"},
-                      Types.StringType.get(),
-                      TableChange.ColumnPosition.after("status")),
-                  TableChange.deleteColumn(new String[] {"create"}, true),
-                  TableChange.renameColumn(new String[] {"delete"}, "varchar")
-                }));
+                TableChange.addColumn(
+                    new String[] {"int"},
+                    Types.StringType.get(),
+                    TableChange.ColumnPosition.after("status"))));
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier, TableChange.renameColumn(new String[] 
{"int"}, "varchar")));
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier, TableChange.deleteColumn(new String[] 
{"varchar"}, true)));
 
     Assertions.assertDoesNotThrow(() -> 
tableCatalog.dropTable(tableIdentifier));
   }
 
   @Test
-  void testMySQLSpecialTableName() {
+  void testClickHouseSpecialTableName() {
     // Test create many indexes with name success.
     Map<String, String> properties = createProperties();
     TableCatalog tableCatalog = catalog.asTableCatalog();
@@ -1445,7 +1294,7 @@ public class CatalogMysqlIT extends BaseIT {
     Column t1_col = Column.of(t1_name, Types.LongType.get(), "id", false, 
false, null);
     Column[] columns = {t1_col};
 
-    Index[] t1_indexes = {Indexes.unique("u1_key", new String[][] 
{{t1_name}})};
+    Index[] t1_indexes = {Indexes.primary("PRIMARY", new String[][] 
{{t1_name}})};
 
     NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, t1_name);
     tableCatalog.createTable(
@@ -1455,12 +1304,12 @@ public class CatalogMysqlIT extends BaseIT {
         properties,
         Transforms.EMPTY_TRANSFORM,
         Distributions.NONE,
-        new SortOrder[0],
+        getSortOrders("t112"),
         t1_indexes);
 
     String t2_name = "t212";
     Column t2_col = Column.of(t2_name, Types.LongType.get(), "id", false, 
false, null);
-    Index[] t2_indexes = {Indexes.unique("u2_key", new String[][] 
{{t2_name}})};
+    Index[] t2_indexes = {Indexes.primary("PRIMARY", new String[][] 
{{t2_name}})};
     columns = new Column[] {t2_col};
     tableIdentifier = NameIdentifier.of(schemaName, t2_name);
     tableCatalog.createTable(
@@ -1470,12 +1319,12 @@ public class CatalogMysqlIT extends BaseIT {
         properties,
         Transforms.EMPTY_TRANSFORM,
         Distributions.NONE,
-        new SortOrder[0],
+        getSortOrders(t2_name),
         t2_indexes);
 
     String t3_name = "t_12";
     Column t3_col = Column.of(t3_name, Types.LongType.get(), "id", false, 
false, null);
-    Index[] t3_indexes = {Indexes.unique("u3_key", new String[][] 
{{t3_name}})};
+    Index[] t3_indexes = {Indexes.primary("PRIMARY", new String[][] 
{{t3_name}})};
     columns = new Column[] {t3_col};
     tableIdentifier = NameIdentifier.of(schemaName, t3_name);
     tableCatalog.createTable(
@@ -1485,12 +1334,12 @@ public class CatalogMysqlIT extends BaseIT {
         properties,
         Transforms.EMPTY_TRANSFORM,
         Distributions.NONE,
-        new SortOrder[0],
+        getSortOrders(t3_name),
         t3_indexes);
 
     String t4_name = "_1__";
     Column t4_col = Column.of(t4_name, Types.LongType.get(), "id", false, 
false, null);
-    Index[] t4_indexes = {Indexes.unique("u4_key", new String[][] 
{{t4_name}})};
+    Index[] t4_indexes = {Indexes.primary("PRIMARY", new String[][] 
{{t4_name}})};
     columns = new Column[] {t4_col};
     tableIdentifier = NameIdentifier.of(schemaName, t4_name);
     tableCatalog.createTable(
@@ -1500,7 +1349,7 @@ public class CatalogMysqlIT extends BaseIT {
         properties,
         Transforms.EMPTY_TRANSFORM,
         Distributions.NONE,
-        new SortOrder[0],
+        getSortOrders(t4_name),
         t4_indexes);
 
     Table t1 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t1_name));
@@ -1510,7 +1359,7 @@ public class CatalogMysqlIT extends BaseIT {
         table_comment,
         Arrays.asList(t1_col),
         properties,
-        t1_indexes,
+        null,
         Transforms.EMPTY_TRANSFORM,
         t1);
 
@@ -1549,7 +1398,7 @@ public class CatalogMysqlIT extends BaseIT {
   }
 
   @Test
-  void testMySqlIllegalTableName() {
+  void testClickHouseIllegalTableName() {
     Map<String, String> properties = createProperties();
     TableCatalog tableCatalog = catalog.asTableCatalog();
     String table_name = "t123";
@@ -1661,19 +1510,15 @@ public class CatalogMysqlIT extends BaseIT {
   }
 
   @Test
-  void testMySQLTableNameCaseSensitive() {
+  void testClickHouseTableNameCaseSensitive() {
     Column col1 = Column.of("col_1", Types.LongType.get(), "id", false, false, 
null);
     Column col2 = Column.of("col_2", Types.ByteType.get(), "yes", false, 
false, null);
     Column col3 = Column.of("col_3", Types.DateType.get(), "comment", false, 
false, null);
-    Column col4 = Column.of("col_4", Types.VarCharType.of(255), "code", false, 
false, null);
-    Column col5 = Column.of("col_5", Types.VarCharType.of(255), "config", 
false, false, null);
+    Column col4 = Column.of("col_4", Types.StringType.get(), "code", false, 
false, null);
+    Column col5 = Column.of("col_5", Types.StringType.get(), "config", false, 
false, null);
     Column[] newColumns = new Column[] {col1, col2, col3, col4, col5};
 
-    Index[] indexes =
-        new Index[] {
-          Indexes.createMysqlPrimaryKey(new String[][] {{"col_1"}, {"col_2"}}),
-          Indexes.unique("u1_key", new String[][] {{"col_2"}, {"col_3"}})
-        };
+    Index[] indexes = new Index[0];
 
     NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
"tableName");
     Map<String, String> properties = createProperties();
@@ -1686,7 +1531,7 @@ public class CatalogMysqlIT extends BaseIT {
             properties,
             Transforms.EMPTY_TRANSFORM,
             Distributions.NONE,
-            new SortOrder[0],
+            getSortOrders("col_1"),
             indexes);
     ITUtils.assertionsTableInfo(
         "tableName",
@@ -1719,7 +1564,7 @@ public class CatalogMysqlIT extends BaseIT {
                     properties,
                     Transforms.EMPTY_TRANSFORM,
                     Distributions.NONE,
-                    new SortOrder[0],
+                    getSortOrders("col_1"),
                     indexes));
     Assertions.assertEquals("TABLENAME", tableAgain.name());
 
@@ -1736,32 +1581,26 @@ public class CatalogMysqlIT extends BaseIT {
 
   @Test
   void testNameSpec() {
-    // test operate illegal schema name from MySQL
+    // test operate illegal schema name from ClickHouse
     String testSchemaName = "//";
     String sql = String.format("CREATE DATABASE `%s`", testSchemaName);
-    mysqlService.executeQuery(sql);
+    clickhouseService.executeQuery(sql);
 
     Schema schema = catalog.asSchemas().loadSchema(testSchemaName);
     Assertions.assertEquals(testSchemaName, schema.name());
 
     String[] schemaIdents = catalog.asSchemas().listSchemas();
-    
Assertions.assertTrue(Arrays.asList(schemaIdents).contains(testSchemaName));
-
-    Exception exception =
-        Assertions.assertThrows(
-            SchemaAlreadyExistsException.class,
-            () -> catalog.asSchemas().createSchema(testSchemaName, null, 
Collections.emptyMap()));
-    Assertions.assertTrue(
-        exception.getMessage().contains("Can't create database '//'; database 
exists"));
+    Assertions.assertTrue(Arrays.stream(schemaIdents).anyMatch(s -> 
s.equals(testSchemaName)));
 
     Assertions.assertTrue(catalog.asSchemas().dropSchema(testSchemaName, 
false));
     Assertions.assertFalse(catalog.asSchemas().schemaExists(testSchemaName));
 
-    // test operate illegal table name from MySQL
-    mysqlService.executeQuery(sql);
+    // test operate illegal table name from ClickHouse
+    clickhouseService.executeQuery(sql);
     String testTableName = "//";
-    sql = String.format("CREATE TABLE `%s`.`%s` (id int)", testSchemaName, 
testTableName);
-    mysqlService.executeQuery(sql);
+    sql =
+        String.format("CREATE TABLE `%s`.`%s` (id int) order by id", 
testSchemaName, testTableName);
+    clickhouseService.executeQuery(sql);
     NameIdentifier tableIdent = NameIdentifier.of(testSchemaName, 
testTableName);
 
     Table table = catalog.asTableCatalog().loadTable(tableIdent);
@@ -1838,14 +1677,12 @@ public class CatalogMysqlIT extends BaseIT {
   }
 
   @Test
-  void testMySQLSchemaNameCaseSensitive() {
+  void testClickHouseSchemaNameCaseSensitive() {
     Column col1 = Column.of("col_1", Types.LongType.get(), "id", false, false, 
null);
     Column col2 = Column.of("col_2", Types.VarCharType.of(255), "code", false, 
false, null);
     Column col3 = Column.of("col_3", Types.VarCharType.of(255), "config", 
false, false, null);
     Column[] newColumns = new Column[] {col1, col2, col3};
 
-    Index[] indexes = new Index[] {Indexes.unique("u1_key", new String[][] 
{{"col_2"}, {"col_3"}})};
-
     String[] schemas = {"db_", "db_1", "db_2", "db12"};
     SupportsSchemas schemaSupport = catalog.asSchemas();
 
@@ -1870,8 +1707,8 @@ public class CatalogMysqlIT extends BaseIT {
           properties,
           Transforms.EMPTY_TRANSFORM,
           Distributions.NONE,
-          new SortOrder[0],
-          indexes);
+          getSortOrders("col_1"),
+          null);
       tableCatalog.createTable(
           NameIdentifier.of(schema, GravitinoITUtils.genRandomName("test2")),
           newColumns,
@@ -1879,7 +1716,7 @@ public class CatalogMysqlIT extends BaseIT {
           properties,
           Transforms.EMPTY_TRANSFORM,
           Distributions.NONE,
-          new SortOrder[0],
+          getSortOrders("col_1"),
           Indexes.EMPTY_INDEXES);
     }
 
@@ -1900,220 +1737,21 @@ public class CatalogMysqlIT extends BaseIT {
   }
 
   @Test
-  void testParsedBitTypeConverter() {
+  void testUnparsedTypeConverter() {
     String tableName = GravitinoITUtils.genRandomName("test_unparsed_type");
-    mysqlService.executeQuery(
-        String.format("CREATE TABLE %s.%s (bit_col bit);", schemaName, 
tableName));
+    clickhouseService.executeQuery(
+        String.format(
+            "CREATE TABLE %s.%s (bit_col IPv4) order by bit_col ;", 
schemaName, tableName));
     Table loadedTable =
         catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
tableName));
-    Assertions.assertEquals(Types.BooleanType.get(), 
loadedTable.columns()[0].dataType());
-  }
-
-  @Test
-  void testOperationTableIndex() {
-    String tableName = GravitinoITUtils.genRandomName("test_add_index");
-    Column col1 = Column.of("col_1", Types.LongType.get(), "id", false, false, 
null);
-    Column col2 = Column.of("col_2", Types.VarCharType.of(255), "code", false, 
false, null);
-    Column col3 = Column.of("col_3", Types.VarCharType.of(255), "config", 
false, false, null);
-    Column[] newColumns = new Column[] {col1, col2, col3};
-    TableCatalog tableCatalog = catalog.asTableCatalog();
-    tableCatalog.createTable(
-        NameIdentifier.of(schemaName, tableName),
-        newColumns,
-        table_comment,
-        createProperties(),
-        Transforms.EMPTY_TRANSFORM,
-        Distributions.NONE,
-        new SortOrder[0],
-        Indexes.EMPTY_INDEXES);
-
-    // add index test.
-    tableCatalog.alterTable(
-        NameIdentifier.of(schemaName, tableName),
-        TableChange.addIndex(
-            Index.IndexType.UNIQUE_KEY, "u1_key", new String[][] {{"col_2"}, 
{"col_3"}}),
-        TableChange.addIndex(
-            Index.IndexType.PRIMARY_KEY,
-            Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME,
-            new String[][] {{"col_1"}}));
-
-    Table table = tableCatalog.loadTable(NameIdentifier.of(schemaName, 
tableName));
-    Index[] indexes =
-        new Index[] {
-          Indexes.unique("u1_key", new String[][] {{"col_2"}, {"col_3"}}),
-          Indexes.createMysqlPrimaryKey(new String[][] {{"col_1"}})
-        };
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(newColumns),
-        createProperties(),
-        indexes,
-        Transforms.EMPTY_TRANSFORM,
-        table);
-
-    // delete index and add new column and index.
-    tableCatalog.alterTable(
-        NameIdentifier.of(schemaName, tableName),
-        TableChange.deleteIndex("u1_key", false),
-        TableChange.addColumn(
-            new String[] {"col_4"},
-            Types.VarCharType.of(255),
-            TableChange.ColumnPosition.defaultPos()),
-        TableChange.addIndex(Index.IndexType.UNIQUE_KEY, "u2_key", new 
String[][] {{"col_4"}}));
-
-    indexes =
-        new Index[] {
-          Indexes.createMysqlPrimaryKey(new String[][] {{"col_1"}}),
-          Indexes.unique("u2_key", new String[][] {{"col_4"}})
-        };
-    table = tableCatalog.loadTable(NameIdentifier.of(schemaName, tableName));
-    Column col4 = Column.of("col_4", Types.VarCharType.of(255), null, true, 
false, null);
-    newColumns = new Column[] {col1, col2, col3, col4};
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(newColumns),
-        createProperties(),
-        indexes,
-        Transforms.EMPTY_TRANSFORM,
-        table);
-
-    // Add a previously existing index
-    tableCatalog.alterTable(
-        NameIdentifier.of(schemaName, tableName),
-        TableChange.addIndex(
-            Index.IndexType.UNIQUE_KEY, "u1_key", new String[][] {{"col_2"}, 
{"col_3"}}),
-        TableChange.addIndex(
-            Index.IndexType.UNIQUE_KEY, "u3_key", new String[][] {{"col_1"}, 
{"col_4"}}));
-
-    indexes =
-        new Index[] {
-          Indexes.createMysqlPrimaryKey(new String[][] {{"col_1"}}),
-          Indexes.unique("u2_key", new String[][] {{"col_4"}}),
-          Indexes.unique("u1_key", new String[][] {{"col_2"}, {"col_3"}}),
-          Indexes.unique("u3_key", new String[][] {{"col_1"}, {"col_4"}})
-        };
-    table = tableCatalog.loadTable(NameIdentifier.of(schemaName, tableName));
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(newColumns),
-        createProperties(),
-        indexes,
-        Transforms.EMPTY_TRANSFORM,
-        table);
-  }
-
-  @Test
-  void testAddColumnAutoIncrement() {
-    Column col1 = Column.of("col_1", Types.LongType.get(), "uid", false, 
false, null);
-    Column col2 = Column.of("col_2", Types.ByteType.get(), "yes", false, 
false, null);
-    Column col3 = Column.of("col_3", Types.DateType.get(), "comment", false, 
false, null);
-    Column col4 = Column.of("col_4", Types.VarCharType.of(255), "code", false, 
false, null);
-    Column col5 = Column.of("col_5", Types.VarCharType.of(255), "config", 
false, false, null);
-    String tableName = "auto_increment_table";
-    Column[] newColumns = new Column[] {col1, col2, col3, col4, col5};
-
-    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
-    Map<String, String> properties = createProperties();
-    TableCatalog tableCatalog = catalog.asTableCatalog();
-    tableCatalog.createTable(
-        tableIdentifier,
-        newColumns,
-        table_comment,
-        properties,
-        Transforms.EMPTY_TRANSFORM,
-        Distributions.NONE,
-        new SortOrder[0],
-        Indexes.EMPTY_INDEXES);
-
-    // Test add auto increment ,but not insert index. will fail.
-    RuntimeException runtimeException =
-        assertThrows(
-            RuntimeException.class,
-            () ->
-                tableCatalog.alterTable(
-                    tableIdentifier,
-                    TableChange.addColumn(
-                        new String[] {"col_6"},
-                        Types.LongType.get(),
-                        "id",
-                        TableChange.ColumnPosition.defaultPos(),
-                        false,
-                        true)));
-    Assertions.assertTrue(
-        StringUtils.contains(
-            runtimeException.getMessage(),
-            "Incorrect table definition; there can be only one auto column and 
it must be defined as a key"));
-
-    // Test add auto increment success.
-    tableCatalog.alterTable(
-        tableIdentifier,
-        TableChange.addColumn(
-            new String[] {"col_6"},
-            Types.LongType.get(),
-            "id",
-            TableChange.ColumnPosition.defaultPos(),
-            false,
-            true),
-        TableChange.addIndex(
-            Index.IndexType.PRIMARY_KEY,
-            Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME,
-            new String[][] {{"col_6"}}));
-
-    Table table = tableCatalog.loadTable(tableIdentifier);
-
-    Column col6 = Column.of("col_6", Types.LongType.get(), "id", false, true, 
null);
-    Index[] indices = new Index[] {Indexes.createMysqlPrimaryKey(new 
String[][] {{"col_6"}})};
-    newColumns = new Column[] {col1, col2, col3, col4, col5, col6};
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(newColumns),
-        properties,
-        indices,
-        Transforms.EMPTY_TRANSFORM,
-        table);
-
-    // Test the auto-increment property of modified fields
-    tableCatalog.alterTable(
-        tableIdentifier, TableChange.updateColumnAutoIncrement(new String[] 
{"col_6"}, false));
-    table = tableCatalog.loadTable(tableIdentifier);
-    col6 = Column.of("col_6", Types.LongType.get(), "id", false, false, null);
-    indices = new Index[] {Indexes.createMysqlPrimaryKey(new String[][] 
{{"col_6"}})};
-    newColumns = new Column[] {col1, col2, col3, col4, col5, col6};
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(newColumns),
-        properties,
-        indices,
-        Transforms.EMPTY_TRANSFORM,
-        table);
-
-    // Add the auto-increment attribute to the field
-    tableCatalog.alterTable(
-        tableIdentifier, TableChange.updateColumnAutoIncrement(new String[] 
{"col_6"}, true));
-    table = tableCatalog.loadTable(tableIdentifier);
-    col6 = Column.of("col_6", Types.LongType.get(), "id", false, true, null);
-    indices = new Index[] {Indexes.createMysqlPrimaryKey(new String[][] 
{{"col_6"}})};
-    newColumns = new Column[] {col1, col2, col3, col4, col5, col6};
-    ITUtils.assertionsTableInfo(
-        tableName,
-        table_comment,
-        Arrays.asList(newColumns),
-        properties,
-        indices,
-        Transforms.EMPTY_TRANSFORM,
-        table);
+    Assertions.assertEquals(Types.ExternalType.of("IPv4"), 
loadedTable.columns()[0].dataType());
   }
 
   @Test
   void testAddColumnDefaultValue() {
-    Column col1 = Column.of("col_1", Types.LongType.get(), "uid", true, false, 
null);
+    Column col1 = Column.of("col_1", Types.LongType.get(), "uid", false, 
false, null);
     Column col2 = Column.of("col_2", Types.ByteType.get(), "yes", true, false, 
null);
-    Column col3 = Column.of("col_3", Types.VarCharType.of(255), "comment", 
true, false, null);
+    Column col3 = Column.of("col_3", Types.StringType.get(), "comment", true, 
false, null);
     String tableName = "default_value_table";
     Column[] newColumns = new Column[] {col1, col2, col3};
 
@@ -2127,21 +1765,18 @@ public class CatalogMysqlIT extends BaseIT {
         properties,
         Transforms.EMPTY_TRANSFORM,
         Distributions.NONE,
-        new SortOrder[0],
+        getSortOrders("col_1"),
         Indexes.EMPTY_INDEXES);
 
     Column col4 =
-        Column.of("col_4", Types.LongType.get(), "col4", false, false, 
Literals.longLiteral(1000L));
+        Column.of("col_4", Types.LongType.get(), "col4", true, false, 
DEFAULT_VALUE_NOT_SET);
     tableCatalog.alterTable(
         tableIdentifier,
         TableChange.addColumn(
             new String[] {col4.name()},
             col4.dataType(),
             col4.comment(),
-            TableChange.ColumnPosition.defaultPos(),
-            col4.nullable(),
-            col4.autoIncrement(),
-            col4.defaultValue()));
+            TableChange.ColumnPosition.defaultPos()));
 
     Table table = tableCatalog.loadTable(tableIdentifier);
     newColumns = new Column[] {col1, col2, col3, col4};
@@ -2157,8 +1792,8 @@ public class CatalogMysqlIT extends BaseIT {
   }
 
   @Test
-  public void testMySqlIntegerTypes() {
-    Column col1 = Column.of("col_1", Types.ByteType.get(), "byte type", true, 
false, null);
+  public void testClickHouseIntegerTypes() {
+    Column col1 = Column.of("col_1", Types.ByteType.get(), "byte type", false, 
false, null);
     Column col2 =
         Column.of("col_2", Types.ByteType.unsigned(), "byte unsigned type", 
true, false, null);
     Column col3 = Column.of("col_3", Types.ShortType.get(), "short type", 
true, false, null);
@@ -2184,7 +1819,7 @@ public class CatalogMysqlIT extends BaseIT {
         properties,
         Transforms.EMPTY_TRANSFORM,
         Distributions.NONE,
-        new SortOrder[0],
+        getSortOrders("col_1"),
         Indexes.EMPTY_INDEXES);
 
     Table table = tableCatalog.loadTable(tableIdentifier);
@@ -2203,20 +1838,19 @@ public class CatalogMysqlIT extends BaseIT {
   @Test
   void testAlterCatalogProperties() throws SQLException {
     Map<String, String> catalogProperties = Maps.newHashMap();
-    String testCatalogName = 
GravitinoITUtils.genRandomName("mysql_it_catalog");
+    String testCatalogName = 
GravitinoITUtils.genRandomName("clickhouse_it_catalog");
 
     catalogProperties.put(
         JdbcConfig.JDBC_URL.getKey(),
         StringUtils.substring(
-                MYSQL_CONTAINER.getJdbcUrl(TEST_DB_NAME),
-                0,
-                MYSQL_CONTAINER.getJdbcUrl(TEST_DB_NAME).lastIndexOf("/"))
-            + "?useSSL=false&allowPublicKeyRetrieval=true");
+            CLICKHOUSE_CONTAINER.getJdbcUrl(TEST_DB_NAME),
+            0,
+            CLICKHOUSE_CONTAINER.getJdbcUrl(TEST_DB_NAME).lastIndexOf("/")));
     catalogProperties.put(
-        JdbcConfig.JDBC_DRIVER.getKey(), 
MYSQL_CONTAINER.getDriverClassName(TEST_DB_NAME));
-    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
MYSQL_CONTAINER.getUsername());
+        JdbcConfig.JDBC_DRIVER.getKey(), 
CLICKHOUSE_CONTAINER.getDriverClassName(TEST_DB_NAME));
+    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
CLICKHOUSE_CONTAINER.getUsername());
 
-    String password = MYSQL_CONTAINER.getPassword();
+    String password = CLICKHOUSE_CONTAINER.getPassword();
     String wrongPassword = password + "wrong";
     catalogProperties.put(JdbcConfig.PASSWORD.getKey(), wrongPassword);
 
@@ -2226,90 +1860,14 @@ public class CatalogMysqlIT extends BaseIT {
 
     Assertions.assertThrows(
         Exception.class, () -> loadCatalog.asSchemas().createSchema("test", 
"", null));
-    metalake.alterCatalog(
-        testCatalogName, 
CatalogChange.setProperty(JdbcConfig.PASSWORD.getKey(), password));
+    Catalog newLoadedCatalog =
+        metalake.alterCatalog(
+            testCatalogName, 
CatalogChange.setProperty(JdbcConfig.PASSWORD.getKey(), password));
 
-    Assertions.assertDoesNotThrow(() -> 
loadCatalog.asSchemas().createSchema("test", "", null));
+    Assertions.assertDoesNotThrow(
+        () -> newLoadedCatalog.asSchemas().createSchema("test", "", null));
 
     loadCatalog.asSchemas().dropSchema("test", true);
     metalake.dropCatalog(testCatalogName, true);
   }
-
-  @Test
-  void testTimeTypePrecision() {
-    // Test different time type precisions
-    String tableName = GravitinoITUtils.genRandomName("test_time_precision");
-    String fullTableName = schemaName + "." + tableName;
-    String sql =
-        "CREATE TABLE "
-            + fullTableName
-            + " (\n"
-            + "  time_no_precision time,\n"
-            + "  time_precision_0 time(0),\n"
-            + "  time_precision_1 time(1),\n"
-            + "  time_precision_3 time(3),\n"
-            + "  time_precision_6 time(6),\n"
-            + "  datetime_no_precision datetime,\n"
-            + "  datetime_precision_0 datetime(0),\n"
-            + "  datetime_precision_1 datetime(1),\n"
-            + "  datetime_precision_3 datetime(3),\n"
-            + "  datetime_precision_6 datetime(6),\n"
-            + "  timestamp_no_precision timestamp,\n"
-            + "  timestamp_precision_0 timestamp(0) default 
current_timestamp,\n"
-            + "  timestamp_precision_1 timestamp(1) default 
current_timestamp(1),\n"
-            + "  timestamp_precision_3 timestamp(3) default '2012-12-31 
11:30:45.123',\n"
-            + "  timestamp_precision_6 timestamp(6) default '2012-12-31 
11:30:45.123456'\n"
-            + ");\n";
-
-    mysqlService.executeQuery(sql);
-    Table loadedTable =
-        catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
tableName));
-
-    // Verify time type precisions
-    for (Column column : loadedTable.columns()) {
-      switch (column.name()) {
-        case "time_no_precision":
-        case "time_precision_0":
-          Assertions.assertEquals(Types.TimeType.of(0), column.dataType());
-          break;
-        case "time_precision_1":
-          Assertions.assertEquals(Types.TimeType.of(1), column.dataType());
-          break;
-        case "time_precision_3":
-          Assertions.assertEquals(Types.TimeType.of(3), column.dataType());
-          break;
-        case "time_precision_6":
-          Assertions.assertEquals(Types.TimeType.of(6), column.dataType());
-          break;
-        case "datetime_no_precision":
-        case "datetime_precision_0":
-          Assertions.assertEquals(Types.TimestampType.withoutTimeZone(0), 
column.dataType());
-          break;
-        case "datetime_precision_1":
-          Assertions.assertEquals(Types.TimestampType.withoutTimeZone(1), 
column.dataType());
-          break;
-        case "datetime_precision_3":
-          Assertions.assertEquals(Types.TimestampType.withoutTimeZone(3), 
column.dataType());
-          break;
-        case "datetime_precision_6":
-          Assertions.assertEquals(Types.TimestampType.withoutTimeZone(6), 
column.dataType());
-          break;
-        case "timestamp_no_precision":
-        case "timestamp_precision_0":
-          Assertions.assertEquals(Types.TimestampType.withTimeZone(0), 
column.dataType());
-          break;
-        case "timestamp_precision_1":
-          Assertions.assertEquals(Types.TimestampType.withTimeZone(1), 
column.dataType());
-          break;
-        case "timestamp_precision_3":
-          Assertions.assertEquals(Types.TimestampType.withTimeZone(3), 
column.dataType());
-          break;
-        case "timestamp_precision_6":
-          Assertions.assertEquals(Types.TimestampType.withTimeZone(6), 
column.dataType());
-          break;
-        default:
-          Assertions.fail("Unexpected column name: " + column.name());
-      }
-    }
-  }
 }
diff --git 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
index 868c0b9425..1e47f72496 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
+++ 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
@@ -356,7 +356,7 @@ public class CatalogMysqlIT extends BaseIT {
     // create failed check.
     NameIdentifier table = NameIdentifier.of(testSchemaName, "test_table");
     Assertions.assertThrows(
-        NotFoundException.class,
+        NoSuchSchemaException.class,
         () ->
             tableCatalog.createTable(
                 table,
diff --git 
a/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
 
b/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
index b637ca7a69..59f405102c 100644
--- 
a/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
+++ 
b/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
@@ -50,7 +50,6 @@ import 
org.apache.gravitino.catalog.postgresql.integration.test.service.PostgreS
 import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
-import org.apache.gravitino.exceptions.NotFoundException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.integration.test.container.ContainerSuite;
 import org.apache.gravitino.integration.test.container.PGImageName;
@@ -424,7 +423,7 @@ public class CatalogPostgreSqlIT extends BaseIT {
     // create failed check.
     NameIdentifier table = NameIdentifier.of(testSchemaName, "test_table");
     Assertions.assertThrows(
-        NotFoundException.class,
+        NoSuchSchemaException.class,
         () ->
             tableCatalog.createTable(
                 table,
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
index 61f7f03bbb..82aedaff7d 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
@@ -54,7 +54,6 @@ import 
org.apache.gravitino.catalog.lakehouse.iceberg.IcebergTable;
 import 
org.apache.gravitino.catalog.lakehouse.iceberg.ops.IcebergCatalogWrapperHelper;
 import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
-import org.apache.gravitino.exceptions.NotFoundException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
@@ -345,7 +344,7 @@ public abstract class CatalogIcebergBaseIT extends BaseIT {
     // create failed check.
     NameIdentifier table = NameIdentifier.of(testSchemaName, "test_table");
     Assertions.assertThrows(
-        NotFoundException.class,
+        NoSuchSchemaException.class,
         () ->
             tableCatalog.createTable(
                 table,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
index e8a12a4603..803d097e19 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
@@ -23,7 +23,6 @@ import static 
org.apache.gravitino.dto.util.DTOConverters.fromDTOs;
 
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
-import com.google.common.collect.Lists;
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.DELETE;
@@ -39,7 +38,6 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
-import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.catalog.TableDispatcher;
@@ -58,7 +56,6 @@ import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpres
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
 import org.apache.gravitino.server.web.Utils;
-import org.apache.gravitino.utils.MetadataObjectUtil;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
 import org.slf4j.Logger;
@@ -140,12 +137,6 @@ public class TableOperations {
             NameIdentifier ident =
                 NameIdentifierUtil.ofTable(metalake, catalog, schema, 
request.getName());
 
-            // Make sure schema is imported, otherwise set owner for the table 
may fail.
-            MetadataObjectUtil.checkMetadataObject(
-                metalake,
-                MetadataObjects.of(
-                    Lists.newArrayList(catalog, schema), 
MetadataObject.Type.SCHEMA));
-
             Table table =
                 dispatcher.createTable(
                     ident,
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
index 4ec82d666b..b4483042f1 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
@@ -20,7 +20,6 @@ package org.apache.gravitino.server.web.rest;
 
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
-import com.google.common.collect.Lists;
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.DELETE;
@@ -34,7 +33,6 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.MetadataObject;
-import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.catalog.TopicDispatcher;
@@ -53,7 +51,6 @@ import 
org.apache.gravitino.server.authorization.annotations.AuthorizationExpres
 import 
org.apache.gravitino.server.authorization.annotations.AuthorizationMetadata;
 import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpressionConstants;
 import org.apache.gravitino.server.web.Utils;
-import org.apache.gravitino.utils.MetadataObjectUtil;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
 import org.slf4j.Logger;
@@ -142,12 +139,6 @@ public class TopicOperations {
             NameIdentifier ident =
                 NameIdentifierUtil.ofTopic(metalake, catalog, schema, 
request.getName());
 
-            // Make sure schema is imported, otherwise set owner for the topic 
may fail.
-            MetadataObjectUtil.checkMetadataObject(
-                metalake,
-                MetadataObjects.of(
-                    Lists.newArrayList(catalog, schema), 
MetadataObject.Type.SCHEMA));
-
             Topic topic =
                 dispatcher.createTopic(
                     ident,
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
index e381d96a91..f663d81215 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTableOperations.java
@@ -127,7 +127,6 @@ public class TestTableOperations extends BaseOperationsTest 
{
     Mockito.doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
-    FieldUtils.writeField(GravitinoEnv.getInstance(), "schemaDispatcher", 
schemaDispatcher, true);
     Mockito.doReturn(true).when(schemaDispatcher).schemaExists(any());
   }
 
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
index ab39638c56..f74e49feb4 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestTopicOperations.java
@@ -96,7 +96,6 @@ public class TestTopicOperations extends BaseOperationsTest {
     Mockito.doReturn(false).when(config).get(ENABLE_AUTHORIZATION);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "config", config, true);
     FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
-    FieldUtils.writeField(GravitinoEnv.getInstance(), "schemaDispatcher", 
schemaDispatcher, true);
     Mockito.doReturn(true).when(schemaDispatcher).schemaExists(any());
   }
 

Reply via email to