yuqi1129 commented on code in PR #9878:
URL: https://github.com/apache/gravitino/pull/9878#discussion_r2850656571


##########
catalogs-contrib/catalog-jdbc-clickhouse/src/test/java/org/apache/gravitino/catalog/clickhouse/integration/test/CatalogClickHouseIT.java:
##########
@@ -0,0 +1,1625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.clickhouse.integration.test;
+
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.ENGINE.MERGETREE;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseTablePropertiesMetadata.GRAVITINO_ENGINE_KEY;
+import static 
org.apache.gravitino.catalog.clickhouse.ClickHouseUtils.getSortOrders;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.CatalogChange;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SupportsSchemas;
+import org.apache.gravitino.auth.AuthConstants;
+import 
org.apache.gravitino.catalog.clickhouse.integration.test.service.ClickHouseService;
+import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NotFoundException;
+import org.apache.gravitino.integration.test.container.ClickHouseContainer;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.integration.test.util.ITUtils;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.FunctionExpression;
+import org.apache.gravitino.rel.expressions.UnparsedExpression;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.types.Decimal;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.utils.RandomNameUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.condition.EnabledIf;
+
+@Tag("gravitino-docker-test")
+@TestInstance(Lifecycle.PER_CLASS)
+public class CatalogClickHouseIT extends BaseIT {
+  private static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  private static final String provider = "jdbc-clickhouse";
+
+  public String metalakeName = 
GravitinoITUtils.genRandomName("clickhouse_it_metalake");
+  public String catalogName = 
GravitinoITUtils.genRandomName("clickhouse_it_catalog");
+  public String schemaName = 
GravitinoITUtils.genRandomName("clickhouse_it_schema");
+  public String tableName = 
GravitinoITUtils.genRandomName("clickhouse_it_table");
+  public String alertTableName = "alert_table_name";
+  public String table_comment = "table_comment";
+
+  public String schema_comment = "test schema";
+  public String CLICKHOUSE_COL_NAME1 = "clickhouse_col_name1";
+  public String CLICKHOUSE_COL_NAME2 = "clickhouse_col_name2";
+  public String CLICKHOUSE_COL_NAME3 = "clickhouse_col_name3";
+  public String CLICKHOUSE_COL_NAME4 = "clickhouse_col_name4";
+  public String CLICKHOUSE_COL_NAME5 = "clickhouse_col_name5";
+
+  private GravitinoMetalake metalake;
+
+  protected Catalog catalog;
+
+  private ClickHouseService clickhouseService;
+
+  private ClickHouseContainer CLICKHOUSE_CONTAINER;
+
+  private TestDatabaseName TEST_DB_NAME;
+
+  boolean supportColumnDefaultValueExpression() {
+    return true;
+  }
+
+  @BeforeAll
+  public void startup() throws IOException, SQLException {
+    TEST_DB_NAME = TestDatabaseName.CLICKHOUSE_CATALOG_CLICKHOUSE_IT;
+
+    containerSuite.startClickHouseContainer(TEST_DB_NAME);
+    CLICKHOUSE_CONTAINER = containerSuite.getClickHouseContainer();
+
+    clickhouseService = new ClickHouseService(CLICKHOUSE_CONTAINER, 
TEST_DB_NAME);
+    createMetalake();
+    createCatalog();
+    createSchema();
+  }
+
+  @AfterAll
+  public void stop() {
+    clearTableAndSchema();
+    metalake.disableCatalog(catalogName);
+    metalake.dropCatalog(catalogName, true);
+    client.disableMetalake(metalakeName);
+    client.dropMetalake(metalakeName, true);
+    clickhouseService.close();
+  }
+
+  @AfterEach
+  public void resetSchema() {
+    clearTableAndSchema();
+    createSchema();
+  }
+
+  private void clearTableAndSchema() {
+    NameIdentifier[] nameIdentifiers =
+        catalog.asTableCatalog().listTables(Namespace.of(schemaName));
+    for (NameIdentifier nameIdentifier : nameIdentifiers) {
+      catalog.asTableCatalog().dropTable(nameIdentifier);
+    }
+    catalog.asSchemas().dropSchema(schemaName, false);
+  }
+
+  private void createMetalake() {
+    GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
+    Assertions.assertEquals(0, gravitinoMetalakes.length);
+
+    client.createMetalake(metalakeName, "comment", Collections.emptyMap());
+    GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName);
+    Assertions.assertEquals(metalakeName, loadMetalake.name());
+
+    metalake = loadMetalake;
+  }
+
+  private void createCatalog() throws SQLException {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+
+    catalogProperties.put(
+        JdbcConfig.JDBC_URL.getKey(),
+        StringUtils.substring(
+            CLICKHOUSE_CONTAINER.getJdbcUrl(TEST_DB_NAME),
+            0,
+            CLICKHOUSE_CONTAINER.getJdbcUrl(TEST_DB_NAME).lastIndexOf("/")));
+    catalogProperties.put(
+        JdbcConfig.JDBC_DRIVER.getKey(), 
CLICKHOUSE_CONTAINER.getDriverClassName(TEST_DB_NAME));
+    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
CLICKHOUSE_CONTAINER.getUsername());
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), 
CLICKHOUSE_CONTAINER.getPassword());
+
+    Catalog createdCatalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.RELATIONAL, provider, "comment", 
catalogProperties);
+    Catalog loadCatalog = metalake.loadCatalog(catalogName);
+    Assertions.assertEquals(createdCatalog, loadCatalog);
+
+    catalog = loadCatalog;
+  }
+
+  private void createSchema() {
+    Map<String, String> prop = Maps.newHashMap();
+    Schema createdSchema = null;
+    try {
+      createdSchema = catalog.asSchemas().createSchema(schemaName, 
schema_comment, prop);
+    } catch (Exception ex) {
+      throw new RuntimeException("Create schema failed: " + ex.getMessage(), 
ex);
+    }
+
+    Schema loadSchema = catalog.asSchemas().loadSchema(schemaName);
+    Assertions.assertEquals(createdSchema.name(), loadSchema.name());
+    prop.forEach((key, value) -> 
Assertions.assertEquals(loadSchema.properties().get(key), value));
+  }
+
+  private Column[] createColumns() {
+    Column col1 = Column.of(CLICKHOUSE_COL_NAME1, Types.IntegerType.get(), 
"col_1_comment");
+    Column col2 = Column.of(CLICKHOUSE_COL_NAME2, Types.DateType.get(), 
"col_2_comment");
+    Column col3 =
+        Column.of(
+            CLICKHOUSE_COL_NAME3,
+            Types.StringType.get(),
+            "col_3_comment",
+            false,
+            false,
+            DEFAULT_VALUE_NOT_SET);
+
+    return new Column[] {col1, col2, col3};
+  }
+
+  private Column[] createColumnsWithDefaultValue() {
+    return new Column[] {
+      Column.of(
+          CLICKHOUSE_COL_NAME1,
+          Types.FloatType.get(),
+          "col_1_comment",
+          false,
+          false,
+          Literals.of("1.23", Types.FloatType.get())),
+      Column.of(
+          CLICKHOUSE_COL_NAME2,
+          Types.TimestampType.withoutTimeZone(),
+          "col_2_comment",
+          false,
+          false,
+          FunctionExpression.of("now")),
+      Column.of(
+          CLICKHOUSE_COL_NAME3,
+          Types.VarCharType.of(255),
+          "col_3_comment",
+          true,
+          false,
+          Literals.NULL),
+      Column.of(
+          CLICKHOUSE_COL_NAME4,
+          Types.IntegerType.get(),
+          "col_4_comment",
+          false,
+          false,
+          Literals.of("1000", Types.IntegerType.get())),
+      Column.of(
+          CLICKHOUSE_COL_NAME5,
+          Types.DecimalType.of(3, 2),
+          "col_5_comment",
+          true,
+          false,
+          Literals.of("1.23", Types.DecimalType.of(3, 2)))
+    };
+  }
+
+  private Map<String, String> createProperties() {
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(GRAVITINO_ENGINE_KEY, MERGETREE.getValue());
+    return properties;
+  }
+
+  @Test
+  void testOperationClickhouseSchema() {
+    SupportsSchemas schemas = catalog.asSchemas();
+    Namespace namespace = Namespace.of(metalakeName, catalogName);
+    // list schema check.
+    String[] nameIdentifiers = schemas.listSchemas();
+    Set<String> schemaNames = Sets.newHashSet(nameIdentifiers);
+    Assertions.assertTrue(schemaNames.contains(schemaName));
+
+    NameIdentifier[] clickhouseNamespaces = 
clickhouseService.listSchemas(namespace);
+    schemaNames =
+        
Arrays.stream(clickhouseNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
+    Assertions.assertTrue(schemaNames.contains(schemaName));
+
+    // create schema check.
+    String testSchemaName = GravitinoITUtils.genRandomName("test_schema_1");
+    NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, 
testSchemaName);
+    schemas.createSchema(testSchemaName, schema_comment, 
Collections.emptyMap());
+    nameIdentifiers = schemas.listSchemas();
+    schemaNames = Sets.newHashSet(nameIdentifiers);
+    Assertions.assertTrue(schemaNames.contains(testSchemaName));
+
+    clickhouseNamespaces = clickhouseService.listSchemas(namespace);
+    schemaNames =
+        
Arrays.stream(clickhouseNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
+    Assertions.assertTrue(schemaNames.contains(testSchemaName));
+
+    Map<String, String> emptyMap = Collections.emptyMap();
+    Assertions.assertThrows(
+        RuntimeException.class,
+        () -> {
+          schemas.createSchema(testSchemaName, schema_comment, emptyMap);
+        });
+
+    // drop schema check.
+    schemas.dropSchema(testSchemaName, false);
+    Assertions.assertThrows(NoSuchSchemaException.class, () -> 
schemas.loadSchema(testSchemaName));
+    Assertions.assertThrows(
+        NoSuchSchemaException.class, () -> 
clickhouseService.loadSchema(schemaIdent));
+
+    nameIdentifiers = schemas.listSchemas();
+    schemaNames = Sets.newHashSet(nameIdentifiers);
+    Assertions.assertFalse(schemaNames.contains(testSchemaName));
+    Assertions.assertFalse(schemas.dropSchema("no-exits", false));
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+
+    // create failed check.
+    NameIdentifier table = NameIdentifier.of(testSchemaName, "test_table");
+    Assertions.assertThrows(
+        NoSuchSchemaException.class,
+        () ->
+            tableCatalog.createTable(
+                table,
+                createColumns(),
+                table_comment,
+                createProperties(),
+                null,
+                Distributions.NONE,
+                getSortOrders(CLICKHOUSE_COL_NAME3)));
+    // drop schema failed check.
+    Assertions.assertFalse(schemas.dropSchema(schemaIdent.name(), true));
+    Assertions.assertFalse(schemas.dropSchema(schemaIdent.name(), false));
+    Assertions.assertFalse(() -> tableCatalog.dropTable(table));
+    clickhouseNamespaces = clickhouseService.listSchemas(Namespace.empty());
+    schemaNames =
+        
Arrays.stream(clickhouseNamespaces).map(NameIdentifier::name).collect(Collectors.toSet());
+    Assertions.assertTrue(schemaNames.contains(schemaName));
+  }
+
+  @Test
+  void testCreateAndLoadClickhouseTable() {
+    // Create table from Gravitino API
+    Column[] columns = createColumns();
+
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Distribution distribution = Distributions.NONE;
+
+    Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
+
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        partitioning,
+        distribution,
+        getSortOrders(CLICKHOUSE_COL_NAME3));
+
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(tableName, loadTable.name());
+    Assertions.assertEquals(table_comment, loadTable.comment());
+    Map<String, String> resultProp = loadTable.properties();
+    for (Map.Entry<String, String> entry : properties.entrySet()) {
+      Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
+      Assertions.assertEquals(entry.getValue(), 
resultProp.get(entry.getKey()));
+    }
+    Assertions.assertEquals(loadTable.columns().length, columns.length);
+    for (int i = 0; i < columns.length; i++) {
+      ITUtils.assertColumn(columns[i], loadTable.columns()[i]);
+    }
+  }
+
+  @Test
+  void testColumnNameWithKeyWords() {
+    // Create table from Gravitino API
+    Column[] columns = {
+      Column.of("integer", Types.IntegerType.get(), "integer", false, false, 
DEFAULT_VALUE_NOT_SET),
+      Column.of("long", Types.LongType.get(), "long"),
+      Column.of("float", Types.FloatType.get(), "float"),
+      Column.of("double", Types.DoubleType.get(), "double"),
+      Column.of("decimal", Types.DecimalType.of(10, 3), "decimal"),
+      Column.of("date", Types.DateType.get(), "date"),
+      Column.of("time", Types.TimeType.get(), "time")
+    };
+
+    String name = GravitinoITUtils.genRandomName("table") + "_keyword";
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, name);
+    Distribution distribution = Distributions.NONE;
+
+    Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
+
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    Table createdTable =
+        tableCatalog.createTable(
+            tableIdentifier,
+            columns,
+            table_comment,
+            properties,
+            partitioning,
+            distribution,
+            getSortOrders("integer"));
+    Assertions.assertEquals(createdTable.name(), name);
+  }
+
+  @Test
+  // ClickHouse support column default value expression after 8.0.13
+  // see https://dev.clickhouse.com/doc/refman/8.0/en/data-type-defaults.html
+  @EnabledIf("supportColumnDefaultValueExpression")
+  void testColumnDefaultValue() {
+    Column col1 =
+        Column.of(
+            CLICKHOUSE_COL_NAME1,
+            Types.IntegerType.get(),
+            "col_1_comment",
+            false,
+            false,
+            FunctionExpression.of("rand"));
+    Column col2 =
+        Column.of(
+            CLICKHOUSE_COL_NAME2,
+            Types.TimestampType.withoutTimeZone(),
+            "col_2_comment",
+            false,
+            false,
+            FunctionExpression.of("now"));
+    Column col3 =
+        Column.of(
+            CLICKHOUSE_COL_NAME3,
+            Types.VarCharType.of(255),
+            "col_3_comment",
+            true,
+            false,
+            Literals.NULL);
+    Column col4 =
+        Column.of(
+            CLICKHOUSE_COL_NAME4, Types.StringType.get(), "col_4_comment", 
false, false, null);
+    Column col5 =
+        Column.of(
+            CLICKHOUSE_COL_NAME5,
+            Types.VarCharType.of(255),
+            "col_5_comment",
+            true,
+            false,
+            Literals.stringLiteral("now()"));
+
+    Column[] newColumns = new Column[] {col1, col2, col3, col4, col5};
+
+    NameIdentifier tableIdent =
+        NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("clickhouse_it_table"));
+    catalog
+        .asTableCatalog()
+        .createTable(
+            tableIdent,
+            newColumns,
+            null,
+            ImmutableMap.of(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            getSortOrders(CLICKHOUSE_COL_NAME1));
+    Table createdTable = catalog.asTableCatalog().loadTable(tableIdent);
+    Assertions.assertEquals(
+        UnparsedExpression.of("rand()"), 
createdTable.columns()[0].defaultValue());
+    Assertions.assertEquals(
+        UnparsedExpression.of("now()"), 
createdTable.columns()[1].defaultValue());
+    Assertions.assertEquals(Literals.NULL, 
createdTable.columns()[2].defaultValue());
+    Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, 
createdTable.columns()[3].defaultValue());
+    Assertions.assertEquals(
+        Literals.stringLiteral("now()"), 
createdTable.columns()[4].defaultValue());
+  }
+
+  @Test
+  // see https://dev.clickhouse.com/doc/refman/8.0/en/data-type-defaults.html
+  @EnabledIf("supportColumnDefaultValueExpression")
+  void testColumnDefaultValueConverter() {
+    // test convert from ClickHouse to Gravitino
+    String tableName = GravitinoITUtils.genRandomName("test_default_value");
+    String fullTableName = schemaName + "." + tableName;
+    String sql =
+        "CREATE TABLE "
+            + fullTableName
+            + " (\n"
+            + "  int_col_1 int default 0x01AF,\n"
+            + "  int_col_2 int default (rand()),\n"
+            + "  int_col_3 int default 3,\n"
+            + "  unsigned_int_col_1 INT UNSIGNED default 1,\n"
+            + "  unsigned_bigint_col_1 BIGINT(20) UNSIGNED default 0,\n"
+            + "  double_col_1 double default 123.45,\n"
+            + "  varchar20_col_1 varchar(20) default '10',\n"
+            + "  varchar100_col_1 varchar(100) default 'now()',\n"
+            + "  varchar200_col_1 varchar(200) default 'curdate()',\n"
+            + "  varchar200_col_2 varchar(200) default (today()),\n"
+            + "  varchar200_col_3 varchar(200) default (now()),\n"
+            + "  datetime_col_1 datetime default now(),\n"
+            + "  datetime_col_2 datetime default now(),\n"
+            + "  datetime_col_3 datetime default null,\n"
+            + "  datetime_col_4 datetime default 19830905,\n"
+            + "  date_col_1 date default (today()),\n"
+            + "  date_col_2 date,\n"
+            + "  date_col_3 date DEFAULT (today() + INTERVAL 1 YEAR),\n"
+            + "  date_col_4 date DEFAULT (today()),\n"
+            + "  date_col_5 date DEFAULT '2024-04-01',\n"
+            + "  timestamp_col_1 timestamp default '2012-12-31 11:30:45',\n"
+            + "  timestamp_col_2 timestamp default 19830905,\n"
+            + "  timestamp_col_3 timestamp(6) default now(),\n"
+            + "  decimal_6_2_col_1 decimal(6, 2) default 1.2,\n"
+            + "  bit_col_1 bit default '1'\n"
+            + ") order by int_col_1;\n";
+
+    clickhouseService.executeQuery(sql);
+    Table loadedTable =
+        catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
tableName));
+
+    for (Column column : loadedTable.columns()) {
+      //      try {
+      switch (column.name()) {
+        case "int_col_1":
+          Assertions.assertEquals(Literals.integerLiteral(431), 
column.defaultValue());
+          break;
+        case "int_col_2":
+          Assertions.assertEquals(UnparsedExpression.of("rand()"), 
column.defaultValue());
+          break;
+        case "int_col_3":
+          Assertions.assertEquals(Literals.integerLiteral(3), 
column.defaultValue());
+          break;
+        case "unsigned_int_col_1":
+          Assertions.assertEquals(Literals.unsignedIntegerLiteral(1L), 
column.defaultValue());
+          break;
+        case "unsigned_bigint_col_1":
+          Assertions.assertEquals(
+              Literals.unsignedLongLiteral(Decimal.of("0")), 
column.defaultValue());
+          break;
+        case "double_col_1":
+          Assertions.assertEquals(Literals.doubleLiteral(123.45), 
column.defaultValue());
+          break;
+        case "varchar20_col_1":
+          Assertions.assertEquals(Literals.stringLiteral("10"), 
column.defaultValue());
+          break;
+        case "varchar100_col_1":
+          Assertions.assertEquals(Literals.stringLiteral("now()"), 
column.defaultValue());
+          break;
+        case "varchar200_col_1":
+          Assertions.assertEquals(Literals.stringLiteral("curdate()"), 
column.defaultValue());
+          break;
+        case "varchar200_col_2":
+          Assertions.assertEquals(Literals.stringLiteral("today()"), 
column.defaultValue());
+          break;
+        case "varchar200_col_3":
+          Assertions.assertEquals(Literals.stringLiteral("now()"), 
column.defaultValue());
+          break;
+        case "datetime_col_1":
+        case "datetime_col_2":
+          Assertions.assertEquals(UnparsedExpression.of("now()"), 
column.defaultValue());
+          break;
+        case "datetime_col_3":
+          Assertions.assertEquals(Literals.NULL, column.defaultValue());
+          break;
+        case "datetime_col_4":
+          Assertions.assertEquals(UnparsedExpression.of("19830905"), 
column.defaultValue());
+          break;
+        case "date_col_1":
+          Assertions.assertEquals(UnparsedExpression.of("today()"), 
column.defaultValue());
+          break;
+        case "date_col_2":
+          Assertions.assertEquals(DEFAULT_VALUE_NOT_SET, 
column.defaultValue());
+          break;
+        case "date_col_3":
+          Assertions.assertEquals(
+              UnparsedExpression.of("today() + toIntervalYear(1)"), 
column.defaultValue());
+          break;
+        case "date_col_4":
+          Assertions.assertEquals(UnparsedExpression.of("today()"), 
column.defaultValue());
+          break;
+        case "date_col_5":
+          Assertions.assertEquals(
+              Literals.dateLiteral(LocalDate.of(2024, 4, 1)), 
column.defaultValue());
+          break;
+        case "timestamp_col_1":
+          Assertions.assertEquals(
+              Literals.timestampLiteral("2012-12-31T11:30:45"), 
column.defaultValue());
+          break;
+        case "timestamp_col_2":
+          Assertions.assertEquals(UnparsedExpression.of("19830905"), 
column.defaultValue());
+          break;
+        case "timestamp_col_3":
+          Assertions.assertEquals(UnparsedExpression.of("now()"), 
column.defaultValue());
+          break;
+        case "decimal_6_2_col_1":
+          Assertions.assertEquals(
+              Literals.decimalLiteral(Decimal.of("1.2", 6, 2)), 
column.defaultValue());
+          break;
+        case "bit_col_1":
+          Assertions.assertEquals(
+              Literals.unsignedLongLiteral(Decimal.of("1")), 
column.defaultValue());
+          break;
+        default:
+          Assertions.fail(
+              "Unexpected column name: "
+                  + column.name()
+                  + ", default value: "
+                  + column.defaultValue());
+      }
+    }
+  }
+
+  @Test
+  void testColumnTypeConverter() {
+    // test convert from ClickHouse to Gravitino
+    String tableName = GravitinoITUtils.genRandomName("test_type_converter");
+    String fullTableName = schemaName + "." + tableName;
+    String sql =
+        "CREATE TABLE "
+            + fullTableName
+            + " (\n"
+            + "  tinyint_col tinyint,\n"
+            + "  smallint_col smallint,\n"
+            + "  int_col int,\n"
+            + "  bigint_col bigint,\n"
+            + "  float_col float,\n"
+            + "  double_col double,\n"
+            + "  date_col date,\n"
+            + "  time_col time,\n"
+            + "  timestamp_col timestamp,\n"
+            + "  datetime_col datetime,\n"
+            + "  decimal_6_2_col decimal(6, 2),\n"
+            + "  varchar20_col varchar(20),\n"
+            + "  text_col text,\n"
+            + "  blob_col blob\n"
+            + ") order by tinyint_col;\n";
+
+    clickhouseService.executeQuery(sql);
+    Table loadedTable =
+        catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
tableName));
+
+    for (Column column : loadedTable.columns()) {
+      switch (column.name()) {
+        case "tinyint_col":
+          Assertions.assertEquals(Types.ByteType.get(), column.dataType());
+          break;
+        case "smallint_col":
+          Assertions.assertEquals(Types.ShortType.get(), column.dataType());
+          break;
+        case "int_col":
+          Assertions.assertEquals(Types.IntegerType.get(), column.dataType());
+          break;
+        case "bigint_col":
+          Assertions.assertEquals(Types.LongType.get(), column.dataType());
+          break;
+        case "float_col":
+          Assertions.assertEquals(Types.FloatType.get(), column.dataType());
+          break;
+        case "double_col":
+          Assertions.assertEquals(Types.DoubleType.get(), column.dataType());
+          break;
+        case "date_col":
+          Assertions.assertEquals(Types.DateType.get(), column.dataType());
+          break;
+        case "time_col":
+          Assertions.assertEquals(Types.LongType.get(), column.dataType());
+          break;
+        case "timestamp_col":
+          Assertions.assertEquals(Types.TimestampType.withoutTimeZone(0), 
column.dataType());
+          break;
+        case "datetime_col":
+          Assertions.assertEquals(Types.TimestampType.withoutTimeZone(0), 
column.dataType());
+          break;
+        case "decimal_6_2_col":
+          Assertions.assertEquals(Types.DecimalType.of(6, 2), 
column.dataType());
+          break;
+        case "varchar20_col":
+          Assertions.assertEquals(Types.StringType.get(), column.dataType());
+          break;
+        case "text_col":
+          Assertions.assertEquals(Types.StringType.get(), column.dataType());
+          break;
+        case "binary_col":
+          Assertions.assertEquals(Types.BinaryType.get(), column.dataType());
+          break;
+        case "blob_col":
+          Assertions.assertEquals(Types.StringType.get(), column.dataType());
+          break;
+        default:
+          Assertions.fail("Unexpected column name: " + column.name());
+      }
+    }
+  }
+
+  @Test
+  void testAlterAndDropClickhouseTable() {
+    Column[] columns = createColumns();
+    catalog
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(schemaName, tableName),
+            columns,
+            table_comment,
+            createProperties(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            getSortOrders(CLICKHOUSE_COL_NAME3));
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog
+              .asTableCatalog()
+              .alterTable(
+                  NameIdentifier.of(schemaName, tableName),
+                  TableChange.rename(alertTableName),
+                  TableChange.updateComment(table_comment + "_new"));
+        });
+
+    catalog
+        .asTableCatalog()
+        .alterTable(NameIdentifier.of(schemaName, tableName), 
TableChange.rename(alertTableName));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, alertTableName),
+            TableChange.updateComment(table_comment + "_new"));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, alertTableName),
+            TableChange.addColumn(new String[] {"col_4"}, 
Types.StringType.get()));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, alertTableName),
+            TableChange.renameColumn(new String[] {CLICKHOUSE_COL_NAME2}, 
"col_2_new"));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, alertTableName),
+            TableChange.updateColumnType(
+                new String[] {CLICKHOUSE_COL_NAME1}, Types.IntegerType.get()));
+
+    Table table = 
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
alertTableName));
+    Assertions.assertEquals(alertTableName, table.name());
+
+    Assertions.assertEquals(CLICKHOUSE_COL_NAME1, table.columns()[0].name());
+    Assertions.assertEquals(Types.IntegerType.get(), 
table.columns()[0].dataType());
+
+    Assertions.assertEquals("col_2_new", table.columns()[1].name());
+    Assertions.assertEquals(Types.DateType.get(), 
table.columns()[1].dataType());
+    Assertions.assertEquals("col_2_comment", table.columns()[1].comment());
+
+    Assertions.assertEquals(CLICKHOUSE_COL_NAME3, table.columns()[2].name());
+    Assertions.assertEquals(Types.StringType.get(), 
table.columns()[2].dataType());
+    Assertions.assertEquals("col_3_comment", table.columns()[2].comment());
+
+    Assertions.assertEquals("col_4", table.columns()[3].name());
+    Assertions.assertEquals(Types.StringType.get(), 
table.columns()[3].dataType());
+    Assertions.assertNull(table.columns()[3].comment());
+    Assertions.assertNotNull(table.auditInfo());
+    Assertions.assertNotNull(table.auditInfo().createTime());
+    Assertions.assertNotNull(table.auditInfo().creator());
+    Assertions.assertNotNull(table.auditInfo().lastModifiedTime());
+    Assertions.assertNotNull(table.auditInfo().lastModifier());
+
+    Column col1 =
+        Column.of("name", Types.StringType.get(), "comment", false, false, 
DEFAULT_VALUE_NOT_SET);
+    Column col2 = Column.of("address", Types.StringType.get(), "comment");
+    Column col3 = Column.of("date_of_birth", Types.DateType.get(), "comment");
+
+    Column[] newColumns = new Column[] {col1, col2, col3};
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("CatalogJdbcIT_table"));
+    catalog
+        .asTableCatalog()
+        .createTable(
+            tableIdentifier,
+            newColumns,
+            table_comment,
+            ImmutableMap.of(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            getSortOrders("name"));
+
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    TableChange change =
+        TableChange.updateColumnPosition(
+            new String[] {"no_column"}, TableChange.ColumnPosition.first());
+    NotFoundException notFoundException =
+        assertThrows(
+            NotFoundException.class, () -> 
tableCatalog.alterTable(tableIdentifier, change));
+    
Assertions.assertTrue(notFoundException.getMessage().contains("no_column"));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            tableIdentifier,
+            TableChange.updateColumnPosition(
+                new String[] {col1.name()}, 
TableChange.ColumnPosition.after(col2.name())));
+
+    Table updateColumnPositionTable = 
catalog.asTableCatalog().loadTable(tableIdentifier);
+
+    Column[] updateCols = updateColumnPositionTable.columns();
+    Assertions.assertEquals(3, updateCols.length);
+    Assertions.assertEquals(col2.name(), updateCols[0].name());
+    Assertions.assertEquals(col1.name(), updateCols[1].name());
+    Assertions.assertEquals(col3.name(), updateCols[2].name());
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            catalog
+                .asTableCatalog()
+                .alterTable(
+                    tableIdentifier,
+                    TableChange.deleteColumn(new String[] {col3.name()}, true),
+                    TableChange.deleteColumn(new String[] {col2.name()}, 
true)));
+    Table delColTable = catalog.asTableCatalog().loadTable(tableIdentifier);
+    Assertions.assertEquals(1, delColTable.columns().length);
+    Assertions.assertEquals(col1.name(), delColTable.columns()[0].name());
+
+    Assertions.assertDoesNotThrow(
+        () -> {
+          catalog.asTableCatalog().dropTable(tableIdentifier);
+        });
+  }
+
+  @Test
+  void testUpdateColumnDefaultValue() {
+    Column[] columns = createColumnsWithDefaultValue();
+    Table table =
+        catalog
+            .asTableCatalog()
+            .createTable(
+                NameIdentifier.of(schemaName, tableName),
+                columns,
+                null,
+                ImmutableMap.of(),
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                getSortOrders(CLICKHOUSE_COL_NAME1));
+
+    Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, 
table.auditInfo().creator());
+    Assertions.assertNull(table.auditInfo().lastModifier());
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[0].name()}, Literals.of("1.2345", 
Types.FloatType.get())));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[1].name()}, 
FunctionExpression.of("now")));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[2].name()}, Literals.of("hello", 
Types.VarCharType.of(255))));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[3].name()}, Literals.of("2000", 
Types.IntegerType.get())));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[4].name()}, Literals.of("2.34", 
Types.DecimalType.of(3, 2))));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[0].name()}, Literals.of("1.2345", 
Types.FloatType.get())),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[1].name()}, 
FunctionExpression.of("now")),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[2].name()}, Literals.of("hello", 
Types.VarCharType.of(255))),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[3].name()}, Literals.of("2000", 
Types.IntegerType.get())),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[4].name()}, Literals.of("2.34", 
Types.DecimalType.of(3, 2))));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[0].name()}, Literals.of("1.2345", 
Types.FloatType.get())),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[1].name()}, 
FunctionExpression.of("now")),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[2].name()}, Literals.of("hello", 
Types.VarCharType.of(255))),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[3].name()}, Literals.of("2000", 
Types.IntegerType.get())),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[4].name()}, Literals.of("2.34", 
Types.DecimalType.of(3, 2))));
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            NameIdentifier.of(schemaName, tableName),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[0].name()}, Literals.of("1.2345", 
Types.FloatType.get())),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[1].name()}, 
FunctionExpression.of("now")),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[2].name()}, Literals.of("hello", 
Types.VarCharType.of(255))),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[3].name()}, Literals.of("2000", 
Types.IntegerType.get())),
+            TableChange.updateColumnDefaultValue(
+                new String[] {columns[4].name()}, Literals.of("2.34", 
Types.DecimalType.of(3, 2))));
+
+    table = catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
tableName));
+
+    Assertions.assertEquals(
+        Literals.of("1.2345", Types.FloatType.get()), 
table.columns()[0].defaultValue());
+    Assertions.assertEquals(UnparsedExpression.of("now()"), 
table.columns()[1].defaultValue());
+    Assertions.assertEquals(
+        Literals.of("hello", Types.StringType.get()), 
table.columns()[2].defaultValue());
+    Assertions.assertEquals(
+        Literals.of("2000", Types.IntegerType.get()), 
table.columns()[3].defaultValue());
+    Assertions.assertEquals(
+        Literals.of("2.34", Types.DecimalType.of(3, 2)), 
table.columns()[4].defaultValue());
+  }
+
+  @Test
+  void testDropClickHouseDatabase() {
+    String schemaName = 
GravitinoITUtils.genRandomName("clickhouse_schema").toLowerCase();
+    String tableName = 
GravitinoITUtils.genRandomName("clickhouse_table").toLowerCase();
+
+    catalog
+        .asSchemas()
+        .createSchema(schemaName, null, ImmutableMap.<String, 
String>builder().build());
+
+    catalog
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(schemaName, tableName),
+            createColumns(),
+            "Created by Gravitino client",
+            ImmutableMap.<String, String>builder().build(),
+            getSortOrders(CLICKHOUSE_COL_NAME3));
+
+    // Try to drop a database, and cascade equals to false, it should not be
+    // allowed.
+    Throwable excep =
+        Assertions.assertThrows(
+            RuntimeException.class, () -> 
catalog.asSchemas().dropSchema(schemaName, false));
+    Assertions.assertTrue(excep.getMessage().contains("the value of cascade 
should be true."));
+
+    // Check the database still exists
+    catalog.asSchemas().loadSchema(schemaName);
+
+    // Try to drop a database, and cascade equals to true, it should be 
allowed.
+    catalog.asSchemas().dropSchema(schemaName, true);
+    // Check database has been dropped
+    SupportsSchemas schemas = catalog.asSchemas();
+    Assertions.assertThrows(
+        NoSuchSchemaException.class,
+        () -> {
+          schemas.loadSchema(schemaName);
+        });
+  }
+
+  @Test
+  public void testSchemaComment() {
+    final String testSchemaName = GravitinoITUtils.genRandomName("test");
+    Assertions.assertDoesNotThrow(
+        () -> {
+          catalog.asSchemas().createSchema(testSchemaName, "comment", null);
+        });
+
+    // test null comment
+    String testSchemaName2 = GravitinoITUtils.genRandomName("test");
+    Schema schema = catalog.asSchemas().createSchema(testSchemaName2, "", 
null);
+    Assertions.assertTrue(StringUtils.isEmpty(schema.comment()));
+    schema = catalog.asSchemas().loadSchema(testSchemaName2);
+    Assertions.assertTrue(StringUtils.isEmpty(schema.comment()));
+    catalog.asSchemas().dropSchema(testSchemaName2, true);
+  }
+
+  @Test
+  public void testBackQuoteTable() {
+    Column col1 = Column.of("create", Types.LongType.get(), "id", false, 
false, null);
+    Column col2 = Column.of("delete", Types.ByteType.get(), "yes", false, 
false, null);
+    Column col3 = Column.of("show", Types.DateType.get(), "comment", false, 
false, null);
+    Column col4 = Column.of("status", Types.VarCharType.of(255), "code", 
false, false, null);
+    Column[] newColumns = new Column[] {col1, col2, col3, col4};
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, "table");
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.createTable(
+                tableIdentifier,
+                newColumns,
+                table_comment,
+                Collections.emptyMap(),
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                getSortOrders("create"),
+                Indexes.EMPTY_INDEXES));
+
+    Assertions.assertDoesNotThrow(() -> 
tableCatalog.loadTable(tableIdentifier));
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier,
+                TableChange.addColumn(
+                    new String[] {"int"},
+                    Types.StringType.get(),
+                    TableChange.ColumnPosition.after("status"))));
+
+    Table table = tableCatalog.loadTable(tableIdentifier);
+    for (Column column : table.columns()) {
+      System.out.println(column.name());
+    }
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier, TableChange.renameColumn(new String[] 
{"int"}, "varchar")));
+
+    Assertions.assertDoesNotThrow(
+        () ->
+            tableCatalog.alterTable(
+                tableIdentifier, TableChange.deleteColumn(new String[] 
{"varchar"}, true)));
+
+    Assertions.assertDoesNotThrow(() -> 
tableCatalog.dropTable(tableIdentifier));
+  }
+
+  @Test
+  void testClickHouseSpecialTableName() {
+    // Test create many indexes with name success.
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+
+    String t1_name = "t112";
+    Column t1_col = Column.of(t1_name, Types.LongType.get(), "id", false, 
false, null);
+    Column[] columns = {t1_col};
+
+    Index[] t1_indexes = {Indexes.primary("PRIMARY", new String[][] 
{{t1_name}})};
+
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, t1_name);
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders("t112"),
+        t1_indexes);
+
+    String t2_name = "t212";
+    Column t2_col = Column.of(t2_name, Types.LongType.get(), "id", false, 
false, null);
+    Index[] t2_indexes = {Indexes.primary("PRIMARY", new String[][] 
{{t2_name}})};
+    columns = new Column[] {t2_col};
+    tableIdentifier = NameIdentifier.of(schemaName, t2_name);
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders(t2_name),
+        t2_indexes);
+
+    String t3_name = "t_12";
+    Column t3_col = Column.of(t3_name, Types.LongType.get(), "id", false, 
false, null);
+    Index[] t3_indexes = {Indexes.primary("PRIMARY", new String[][] 
{{t3_name}})};
+    columns = new Column[] {t3_col};
+    tableIdentifier = NameIdentifier.of(schemaName, t3_name);
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders(t3_name),
+        t3_indexes);
+
+    String t4_name = "_1__";
+    Column t4_col = Column.of(t4_name, Types.LongType.get(), "id", false, 
false, null);
+    Index[] t4_indexes = {Indexes.primary("PRIMARY", new String[][] 
{{t4_name}})};
+    columns = new Column[] {t4_col};
+    tableIdentifier = NameIdentifier.of(schemaName, t4_name);
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders(t4_name),
+        t4_indexes);
+
+    Table t1 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t1_name));
+    Arrays.stream(t1.columns()).anyMatch(c -> Objects.equals(c.name(), 
"t112"));
+    ITUtils.assertionsTableInfo(
+        t1_name,
+        table_comment,
+        Arrays.asList(t1_col),
+        properties,
+        null,
+        Transforms.EMPTY_TRANSFORM,
+        t1);
+
+    Table t2 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t2_name));
+    Arrays.stream(t2.columns()).anyMatch(c -> Objects.equals(c.name(), 
"t212"));
+    ITUtils.assertionsTableInfo(
+        t2_name,
+        table_comment,
+        Arrays.asList(t2_col),
+        properties,
+        t2_indexes,
+        Transforms.EMPTY_TRANSFORM,
+        t2);
+
+    Table t3 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t3_name));
+    Arrays.stream(t3.columns()).anyMatch(c -> Objects.equals(c.name(), 
"t_12"));
+    ITUtils.assertionsTableInfo(
+        t3_name,
+        table_comment,
+        Arrays.asList(t3_col),
+        properties,
+        t3_indexes,
+        Transforms.EMPTY_TRANSFORM,
+        t3);
+
+    Table t4 = tableCatalog.loadTable(NameIdentifier.of(schemaName, t4_name));
+    Arrays.stream(t4.columns()).anyMatch(c -> Objects.equals(c.name(), 
"_1__"));
+    ITUtils.assertionsTableInfo(
+        t4_name,
+        table_comment,
+        Arrays.asList(t4_col),
+        properties,
+        t4_indexes,
+        Transforms.EMPTY_TRANSFORM,
+        t4);
+  }
+
+  @Test
+  void testClickHouseIllegalTableName() {
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    String table_name = "t123";
+
+    String t1_name = table_name + "`; DROP TABLE important_table; -- ";
+    Column t1_col = Column.of(t1_name, Types.LongType.get(), "id", false, 
false, null);
+    Column[] columns = {t1_col};
+    Index[] t1_indexes = {Indexes.unique("u1_key", new String[][] 
{{t1_name}})};
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(metalakeName, catalogName, schemaName, t1_name);
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          tableCatalog.createTable(
+              tableIdentifier,
+              columns,
+              table_comment,
+              properties,
+              Transforms.EMPTY_TRANSFORM,
+              Distributions.NONE,
+              new SortOrder[0],
+              t1_indexes);
+        });
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asTableCatalog().dropTable(tableIdentifier);
+        });
+
+    String t2_name = table_name + "`; SLEEP(10); -- ";
+    Column t2_col = Column.of(t2_name, Types.LongType.get(), "id", false, 
false, null);
+    Index[] t2_indexes = {Indexes.unique("u2_key", new String[][] 
{{t2_name}})};
+    Column[] columns2 = new Column[] {t2_col};
+    NameIdentifier tableIdentifier2 =
+        NameIdentifier.of(metalakeName, catalogName, schemaName, t2_name);
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          tableCatalog.createTable(
+              tableIdentifier2,
+              columns2,
+              table_comment,
+              properties,
+              Transforms.EMPTY_TRANSFORM,
+              Distributions.NONE,
+              new SortOrder[0],
+              t2_indexes);
+        });
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asTableCatalog().dropTable(tableIdentifier2);
+        });
+
+    String t3_name =
+        table_name + "`; UPDATE Users SET password = 'newpassword' WHERE 
username = 'admin'; -- ";
+    Column t3_col = Column.of(t3_name, Types.LongType.get(), "id", false, 
false, null);
+    Index[] t3_indexes = {Indexes.unique("u3_key", new String[][] 
{{t3_name}})};
+    Column[] columns3 = new Column[] {t3_col};
+    NameIdentifier tableIdentifier3 =
+        NameIdentifier.of(metalakeName, catalogName, schemaName, t3_name);
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          tableCatalog.createTable(
+              tableIdentifier3,
+              columns3,
+              table_comment,
+              properties,
+              Transforms.EMPTY_TRANSFORM,
+              Distributions.NONE,
+              new SortOrder[0],
+              t3_indexes);
+        });
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asTableCatalog().dropTable(tableIdentifier3);
+        });
+
+    String invalidInput = StringUtils.repeat("a", 65);
+    Column t4_col = Column.of(invalidInput, Types.LongType.get(), "id", false, 
false, null);
+    Index[] t4_indexes = {Indexes.unique("u4_key", new String[][] 
{{invalidInput}})};
+    Column[] columns4 = new Column[] {t4_col};
+    NameIdentifier tableIdentifier4 =
+        NameIdentifier.of(metalakeName, catalogName, schemaName, invalidInput);
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          tableCatalog.createTable(
+              tableIdentifier4,
+              columns4,
+              table_comment,
+              properties,
+              Transforms.EMPTY_TRANSFORM,
+              Distributions.NONE,
+              new SortOrder[0],
+              t4_indexes);
+        });
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asTableCatalog().dropTable(tableIdentifier4);
+        });
+  }
+
+  @Test
+  void testClickHouseTableNameCaseSensitive() {
+    Column col1 = Column.of("col_1", Types.LongType.get(), "id", false, false, 
null);
+    Column col2 = Column.of("col_2", Types.ByteType.get(), "yes", false, 
false, null);
+    Column col3 = Column.of("col_3", Types.DateType.get(), "comment", false, 
false, null);
+    Column col4 = Column.of("col_4", Types.StringType.get(), "code", false, 
false, null);
+    Column col5 = Column.of("col_5", Types.StringType.get(), "config", false, 
false, null);
+    Column[] newColumns = new Column[] {col1, col2, col3, col4, col5};
+
+    Index[] indexes = new Index[0];
+
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, 
"tableName");
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    Table createdTable =
+        tableCatalog.createTable(
+            tableIdentifier,
+            newColumns,
+            table_comment,
+            properties,
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            getSortOrders("col_1"),
+            indexes);
+    ITUtils.assertionsTableInfo(
+        "tableName",
+        table_comment,
+        Arrays.asList(newColumns),
+        properties,
+        indexes,
+        Transforms.EMPTY_TRANSFORM,
+        createdTable);
+    Table table = tableCatalog.loadTable(tableIdentifier);
+    ITUtils.assertionsTableInfo(
+        "tableName",
+        table_comment,
+        Arrays.asList(newColumns),
+        properties,
+        indexes,
+        Transforms.EMPTY_TRANSFORM,
+        table);
+
+    // Test create table with same name but different case
+    NameIdentifier tableIdentifier2 = NameIdentifier.of(schemaName, 
"TABLENAME");
+
+    Table tableAgain =
+        Assertions.assertDoesNotThrow(
+            () ->
+                tableCatalog.createTable(
+                    tableIdentifier2,
+                    newColumns,
+                    table_comment,
+                    properties,
+                    Transforms.EMPTY_TRANSFORM,
+                    Distributions.NONE,
+                    getSortOrders("col_1"),
+                    indexes));
+    Assertions.assertEquals("TABLENAME", tableAgain.name());
+
+    table = tableCatalog.loadTable(tableIdentifier2);
+    ITUtils.assertionsTableInfo(
+        "TABLENAME",
+        table_comment,
+        Arrays.asList(newColumns),
+        properties,
+        indexes,
+        Transforms.EMPTY_TRANSFORM,
+        table);
+  }
+
+  @Test
+  void testNameSpec() {
+    // test operate illegal schema name from ClickHouse
+    String testSchemaName = "//";
+    String sql = String.format("CREATE DATABASE `%s`", testSchemaName);
+    clickhouseService.executeQuery(sql);
+
+    Schema schema = catalog.asSchemas().loadSchema(testSchemaName);
+    Assertions.assertEquals(testSchemaName, schema.name());
+
+    String[] schemaIdents = catalog.asSchemas().listSchemas();
+    Assertions.assertTrue(Arrays.stream(schemaIdents).anyMatch(s -> 
s.equals(testSchemaName)));
+
+    Assertions.assertTrue(catalog.asSchemas().dropSchema(testSchemaName, 
false));
+    Assertions.assertFalse(catalog.asSchemas().schemaExists(testSchemaName));
+
+    // test operate illegal table name from ClickHouse
+    clickhouseService.executeQuery(sql);
+    String testTableName = "//";
+    sql =
+        String.format("CREATE TABLE `%s`.`%s` (id int) order by id", 
testSchemaName, testTableName);
+    clickhouseService.executeQuery(sql);
+    NameIdentifier tableIdent = NameIdentifier.of(testSchemaName, 
testTableName);
+
+    Table table = catalog.asTableCatalog().loadTable(tableIdent);
+    Assertions.assertEquals(testTableName, table.name());
+
+    NameIdentifier[] tableIdents =
+        catalog.asTableCatalog().listTables(Namespace.of(testSchemaName));
+    Assertions.assertTrue(Arrays.stream(tableIdents).anyMatch(t -> 
t.name().equals(testTableName)));
+
+    Assertions.assertTrue(catalog.asTableCatalog().dropTable(tableIdent));
+    Assertions.assertFalse(catalog.asTableCatalog().tableExists(tableIdent));
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () -> {
+          catalog.asTableCatalog().purgeTable(tableIdent);
+        });
+    catalog.asSchemas().dropSchema(testSchemaName, true);
+
+    // sql injection
+    String schemaName = RandomNameUtils.genRandomName("ct_db");
+    Map<String, String> properties = new HashMap<>();
+    String comment = null;
+
+    // should throw an exception with string that might contain SQL injection
+    String sqlInjection = schemaName + "`; DROP TABLE important_table; -- ";
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asSchemas().createSchema(sqlInjection, comment, properties);
+        });
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asSchemas().dropSchema(sqlInjection, false);
+        });
+
+    String sqlInjection1 = schemaName + "`; SLEEP(10); -- ";
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asSchemas().createSchema(sqlInjection1, comment, properties);
+        });
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asSchemas().dropSchema(sqlInjection1, false);
+        });
+
+    String sqlInjection2 =
+        schemaName + "`; UPDATE Users SET password = 'newpassword' WHERE 
username = 'admin'; -- ";
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asSchemas().createSchema(sqlInjection2, comment, properties);
+        });
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asSchemas().dropSchema(sqlInjection2, false);
+        });
+
+    // should throw an exception with input that has more than 64 characters
+    String invalidInput = StringUtils.repeat("a", 65);
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asSchemas().createSchema(invalidInput, comment, properties);
+        });
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          catalog.asSchemas().dropSchema(invalidInput, false);
+        });
+  }
+
+  @Test
+  void testClickHouseSchemaNameCaseSensitive() {
+    Column col1 = Column.of("col_1", Types.LongType.get(), "id", false, false, 
null);
+    Column col2 = Column.of("col_2", Types.VarCharType.of(255), "code", false, 
false, null);
+    Column col3 = Column.of("col_3", Types.VarCharType.of(255), "config", 
false, false, null);
+    Column[] newColumns = new Column[] {col1, col2, col3};
+
+    String[] schemas = {"db_", "db_1", "db_2", "db12"};
+    SupportsSchemas schemaSupport = catalog.asSchemas();
+
+    for (String schema : schemas) {
+      schemaSupport.createSchema(schema, null, Collections.emptyMap());
+      Assertions.assertNotNull(schemaSupport.loadSchema(schema));
+    }
+
+    Set<String> schemaNames = Sets.newHashSet(schemaSupport.listSchemas());
+
+    Assertions.assertTrue(schemaNames.containsAll(Arrays.asList(schemas)));
+
+    String tableName = "test1";
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+
+    for (String schema : schemas) {
+      tableCatalog.createTable(
+          NameIdentifier.of(schema, tableName),
+          newColumns,
+          table_comment,
+          properties,
+          Transforms.EMPTY_TRANSFORM,
+          Distributions.NONE,
+          getSortOrders("col_1"),
+          null);
+      tableCatalog.createTable(
+          NameIdentifier.of(schema, GravitinoITUtils.genRandomName("test2")),
+          newColumns,
+          table_comment,
+          properties,
+          Transforms.EMPTY_TRANSFORM,
+          Distributions.NONE,
+          getSortOrders("col_1"),
+          Indexes.EMPTY_INDEXES);
+    }
+
+    for (String schema : schemas) {
+      NameIdentifier[] nameIdentifiers = 
tableCatalog.listTables(Namespace.of(schema));
+      Assertions.assertEquals(2, nameIdentifiers.length);
+      Assertions.assertTrue(
+          Arrays.stream(nameIdentifiers)
+              .map(NameIdentifier::name)
+              .collect(Collectors.toSet())
+              .stream()
+              .anyMatch(n -> n.equals(tableName)));
+    }
+
+    for (String schema : schemas) {
+      schemaSupport.dropSchema(schema, true);
+    }
+  }
+
+  @Test
+  void testUnparsedTypeConverter() {
+    String tableName = GravitinoITUtils.genRandomName("test_unparsed_type");
+    clickhouseService.executeQuery(
+        String.format(
+            "CREATE TABLE %s.%s (bit_col IPv4) order by bit_col ;", 
schemaName, tableName));
+    Table loadedTable =
+        catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, 
tableName));
+    Assertions.assertEquals(Types.ExternalType.of("IPv4"), 
loadedTable.columns()[0].dataType());
+  }
+
+  @Test
+  void testAddColumnDefaultValue() {
+    Column col1 = Column.of("col_1", Types.LongType.get(), "uid", false, 
false, null);
+    Column col2 = Column.of("col_2", Types.ByteType.get(), "yes", true, false, 
null);
+    Column col3 = Column.of("col_3", Types.StringType.get(), "comment", true, 
false, null);
+    String tableName = "default_value_table";
+    Column[] newColumns = new Column[] {col1, col2, col3};
+
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        newColumns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders("col_1"),
+        Indexes.EMPTY_INDEXES);
+
+    Column col4 =
+        Column.of("col_4", Types.LongType.get(), "col4", true, false, 
DEFAULT_VALUE_NOT_SET);
+    tableCatalog.alterTable(
+        tableIdentifier,
+        TableChange.addColumn(
+            new String[] {col4.name()},
+            col4.dataType(),
+            col4.comment(),
+            TableChange.ColumnPosition.defaultPos()));
+
+    Table table = tableCatalog.loadTable(tableIdentifier);
+    newColumns = new Column[] {col1, col2, col3, col4};
+
+    ITUtils.assertionsTableInfo(
+        tableName,
+        table_comment,
+        Arrays.asList(newColumns),
+        properties,
+        Indexes.EMPTY_INDEXES,
+        Transforms.EMPTY_TRANSFORM,
+        table);
+  }
+
+  @Test
+  public void testClickHouseIntegerTypes() {
+    Column col1 = Column.of("col_1", Types.ByteType.get(), "byte type", false, 
false, null);
+    Column col2 =
+        Column.of("col_2", Types.ByteType.unsigned(), "byte unsigned type", 
true, false, null);
+    Column col3 = Column.of("col_3", Types.ShortType.get(), "short type", 
true, false, null);
+    Column col4 =
+        Column.of("col_4", Types.ShortType.unsigned(), "short unsigned type ", 
true, false, null);
+    Column col5 = Column.of("col_5", Types.IntegerType.get(), "integer type", 
true, false, null);
+    Column col6 =
+        Column.of(
+            "col_6", Types.IntegerType.unsigned(), "integer unsigned type", 
true, false, null);
+    Column col7 = Column.of("col_7", Types.LongType.get(), "long type", true, 
false, null);
+    Column col8 =
+        Column.of("col_8", Types.LongType.unsigned(), "long unsigned type", 
true, false, null);
+    String tableName = "default_integer_types_table";
+    Column[] newColumns = new Column[] {col1, col2, col3, col4, col5, col6, 
col7, col8};
+
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Map<String, String> properties = createProperties();
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    tableCatalog.createTable(
+        tableIdentifier,
+        newColumns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        getSortOrders("col_1"),
+        Indexes.EMPTY_INDEXES);
+
+    Table table = tableCatalog.loadTable(tableIdentifier);
+    Assertions.assertEquals(8, table.columns().length);
+    Column[] columns = table.columns();
+    Assertions.assertEquals(columns[0].dataType().simpleString(), "byte");
+    Assertions.assertEquals(columns[1].dataType().simpleString(), "byte 
unsigned");
+    Assertions.assertEquals(columns[2].dataType().simpleString(), "short");
+    Assertions.assertEquals(columns[3].dataType().simpleString(), "short 
unsigned");
+    Assertions.assertEquals(columns[4].dataType().simpleString(), "integer");
+    Assertions.assertEquals(columns[5].dataType().simpleString(), "integer 
unsigned");
+    Assertions.assertEquals(columns[6].dataType().simpleString(), "long");
+    Assertions.assertEquals(columns[7].dataType().simpleString(), "long 
unsigned");
+  }
+
+  @Test
+  void testAlterCatalogProperties() throws SQLException {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    String testCatalogName = 
GravitinoITUtils.genRandomName("clickhouse_it_catalog");
+
+    catalogProperties.put(
+        JdbcConfig.JDBC_URL.getKey(),
+        StringUtils.substring(
+            CLICKHOUSE_CONTAINER.getJdbcUrl(TEST_DB_NAME),
+            0,
+            CLICKHOUSE_CONTAINER.getJdbcUrl(TEST_DB_NAME).lastIndexOf("/")));
+    catalogProperties.put(
+        JdbcConfig.JDBC_DRIVER.getKey(), 
CLICKHOUSE_CONTAINER.getDriverClassName(TEST_DB_NAME));
+    catalogProperties.put(JdbcConfig.USERNAME.getKey(), 
CLICKHOUSE_CONTAINER.getUsername());
+
+    String password = CLICKHOUSE_CONTAINER.getPassword();
+    String wrongPassword = password + "wrong";
+    catalogProperties.put(JdbcConfig.PASSWORD.getKey(), wrongPassword);
+
+    metalake.createCatalog(
+        testCatalogName, Catalog.Type.RELATIONAL, provider, "comment", 
catalogProperties);
+    Catalog loadCatalog = metalake.loadCatalog(testCatalogName);
+
+    Assertions.assertThrows(
+        Exception.class, () -> loadCatalog.asSchemas().createSchema("test", 
"", null));
+    Catalog newLoadedCatalog =
+        metalake.alterCatalog(
+            testCatalogName, 
CatalogChange.setProperty(JdbcConfig.PASSWORD.getKey(), password));
+
+    Assertions.assertDoesNotThrow(
+        () -> newLoadedCatalog.asSchemas().createSchema("test", "", null));
+
+    loadCatalog.asSchemas().dropSchema("test", true);
+    metalake.dropCatalog(testCatalogName, true);
+  }
+}

Review Comment:
   added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to